ivankelly closed pull request #898: Bookies should prioritize recovery 
reads/writes
URL: https://github.com/apache/bookkeeper/pull/898
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto 
b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index ffd0f422c..d4a7d2ea8 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -72,6 +72,7 @@ message BKPacketHeader {
     required ProtocolVersion version = 1;
     required OperationType operation = 2;
     required uint64 txnId = 3;
+    optional uint32 priority = 4 [default = 0];
 }
 
 message Request {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index ea43a9646..027a04dda 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -95,6 +95,7 @@
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -825,7 +826,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
 
                                 buffer.release();
                                 future.complete(null);
-                            }, null);
+                                }, null, BookieProtocol.FLAG_NONE);
 
                         try {
                             future.get();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index dcface641..5eb00d2be 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -386,7 +386,8 @@ public boolean hasNext() {
                 try {
                     CompletableFuture<Enumeration<LedgerEntry>> result = new 
CompletableFuture<>();
 
-                    handle.asyncReadEntriesInternal(nextEntryId, nextEntryId, 
new SyncReadCallback(result), null);
+                    handle.asyncReadEntriesInternal(nextEntryId, nextEntryId,
+                                                    new 
SyncReadCallback(result), null, false);
 
                     currentEntry = 
SyncCallbackUtils.waitForResult(result).nextElement();
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index 049492bab..6f232cf61 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -33,6 +33,7 @@
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.slf4j.Logger;
@@ -198,8 +199,8 @@ private void verifyLedgerFragment(LedgerFragment fragment,
         } else if (firstStored == lastStored) {
             ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(1,
                     fragment, cb);
-            bookieClient.readEntry(bookie, fragment
-                    .getLedgerId(), firstStored, manycb, null);
+            bookieClient.readEntry(bookie, fragment.getLedgerId(), firstStored,
+                                   manycb, null, BookieProtocol.FLAG_NONE);
         } else {
             if (lastStored <= firstStored) {
                 cb.operationComplete(Code.IncorrectParameterException, null);
@@ -239,9 +240,8 @@ private void verifyLedgerFragment(LedgerFragment fragment,
             }
             ReadManyEntriesCallback manycb = new 
ReadManyEntriesCallback(entriesToBeVerified.size(),
                     fragment, cb);
-
             for (Long entryID: entriesToBeVerified) {
-                bookieClient.readEntry(bookie, fragment.getLedgerId(), 
entryID, manycb, null);
+                bookieClient.readEntry(bookie, fragment.getLedgerId(), 
entryID, manycb, null, BookieProtocol.FLAG_NONE);
             }
         }
     }
@@ -382,7 +382,8 @@ public void operationComplete(int rc, Boolean result) {
                 DistributionSchedule.WriteSet writeSet = 
lh.getDistributionSchedule().getWriteSet(entryToRead);
                 for (int i = 0; i < writeSet.size(); i++) {
                     BookieSocketAddress addr = 
curEnsemble.get(writeSet.get(i));
-                    bookieClient.readEntry(addr, lh.getId(), entryToRead, 
eecb, null);
+                    bookieClient.readEntry(addr, lh.getId(), entryToRead,
+                                           eecb, null, 
BookieProtocol.FLAG_NONE);
                 }
                 writeSet.recycle();
                 return;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 8d57117d1..bdc547f27 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -652,7 +652,7 @@ public void asyncReadEntries(long firstEntry, long 
lastEntry, ReadCallback cb, O
             return;
         }
 
-        asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx);
+        asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
     }
 
     /**
@@ -691,7 +691,7 @@ public void asyncReadUnconfirmedEntries(long firstEntry, 
long lastEntry, ReadCal
             return;
         }
 
-        asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx);
+        asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
     }
 
     /**
@@ -717,7 +717,7 @@ public void asyncReadUnconfirmedEntries(long firstEntry, 
long lastEntry, ReadCal
             return FutureUtils.exception(new BKReadException());
         }
 
-        return readEntriesInternalAsync(firstEntry, lastEntry);
+        return readEntriesInternalAsync(firstEntry, lastEntry, false);
     }
 
     /**
@@ -752,12 +752,13 @@ public void asyncReadUnconfirmedEntries(long firstEntry, 
long lastEntry, ReadCal
             return FutureUtils.exception(new BKIncorrectParameterException());
         }
 
-        return readEntriesInternalAsync(firstEntry, lastEntry);
+        return readEntriesInternalAsync(firstEntry, lastEntry, false);
     }
 
-    void asyncReadEntriesInternal(long firstEntry, long lastEntry, 
ReadCallback cb, Object ctx) {
+    void asyncReadEntriesInternal(long firstEntry, long lastEntry, 
ReadCallback cb,
+                                  Object ctx, boolean isRecoveryRead) {
         if (!bk.isClosed()) {
-            readEntriesInternalAsync(firstEntry, lastEntry)
+            readEntriesInternalAsync(firstEntry, lastEntry, isRecoveryRead)
                 .whenCompleteAsync(new FutureEventListener<LedgerEntries>() {
                     @Override
                     public void onSuccess(LedgerEntries entries) {
@@ -802,7 +803,7 @@ public void asyncReadLastEntry(ReadCallback cb, Object ctx) 
{
             // Ledger was empty, so there is no last entry to read
             cb.readComplete(BKException.Code.NoSuchEntryException, this, null, 
ctx);
         } else {
-            asyncReadEntriesInternal(lastEntryId, lastEntryId, cb, ctx);
+            asyncReadEntriesInternal(lastEntryId, lastEntryId, cb, ctx, false);
         }
     }
 
@@ -821,8 +822,9 @@ public LedgerEntry readLastEntry()
     }
 
     CompletableFuture<LedgerEntries> readEntriesInternalAsync(long firstEntry,
-                                                              long lastEntry) {
-        PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), 
firstEntry, lastEntry);
+                                                              long lastEntry,
+                                                              boolean 
isRecoveryRead) {
+        PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), 
firstEntry, lastEntry, isRecoveryRead);
         if (!bk.isClosed()) {
             bk.getMainWorkerPool().submitOrdered(ledgerId, op);
         } else {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index adf67a8d5..67334aaab 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -18,6 +18,9 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_HIGH_PRIORITY;
+import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_NONE;
+import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -32,7 +35,6 @@
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -124,7 +126,7 @@ long getEntryId() {
     }
 
     void sendWriteRequest(int bookieIndex) {
-        int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD : 
BookieProtocol.FLAG_NONE;
+        int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : 
FLAG_NONE;
 
         
lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex), 
lh.ledgerId, lh.ledgerKey,
                 entryId, toSend, this, bookieIndex, flags);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 0276ef4d2..7f2860bc7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -40,6 +40,7 @@
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
@@ -77,6 +78,7 @@
 
     final int requiredBookiesMissingEntryForRecovery;
     final boolean isRecoveryRead;
+
     boolean parallelRead = false;
     final AtomicBoolean complete = new AtomicBoolean(false);
 
@@ -568,8 +570,10 @@ void sendReadTo(int bookieIndex, BookieSocketAddress to, 
LedgerEntryRequest entr
             lh.throttler.acquire();
         }
 
+        int flags = isRecoveryRead ? BookieProtocol.FLAG_HIGH_PRIORITY : 
BookieProtocol.FLAG_NONE;
         lh.bk.getBookieClient().readEntry(to, lh.ledgerId, 
entry.entryImpl.getEntryId(),
-                                     this, new ReadContext(bookieIndex, to, 
entry));
+                                          this, new ReadContext(bookieIndex, 
to, entry),
+                                          flags);
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index d0e116e39..32d7ffe6f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -62,17 +62,17 @@ public void initiate() {
             
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
                                          lh.ledgerId,
                                          BookieProtocol.LAST_ADD_CONFIRMED,
-                                         this, i);
+                                         this, i, BookieProtocol.FLAG_NONE);
         }
     }
 
     public void initiateWithFencing() {
         for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-            
lh.bk.getBookieClient().readEntryAndFenceLedger(lh.metadata.currentEnsemble.get(i),
-                                                       lh.ledgerId,
-                                                       lh.ledgerKey,
-                                                       
BookieProtocol.LAST_ADD_CONFIRMED,
-                                                       this, i);
+            
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
+                                              lh.ledgerId,
+                                              
BookieProtocol.LAST_ADD_CONFIRMED,
+                                              this, i, 
BookieProtocol.FLAG_DO_FENCING,
+                                              lh.ledgerKey);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 55b7b5cc4..9d62f72d7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -191,7 +191,7 @@ public void readLastConfirmedComplete(int rc, long 
lastConfirmed, Object ctx) {
                         // Ledger was empty, so there is no last entry to read
                         cb.readComplete(BKException.Code.NoSuchEntryException, 
ReadOnlyLedgerHandle.this, null, ctx);
                     } else {
-                        asyncReadEntriesInternal(lastConfirmed, lastConfirmed, 
cb, ctx);
+                        asyncReadEntriesInternal(lastConfirmed, lastConfirmed, 
cb, ctx, false);
                     }
                 } else {
                     LOG.error("ReadException in asyncReadLastEntry, ledgerId: 
{}, lac: {}, rc:{}",
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
index bcaf231be..441504c60 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
@@ -54,7 +54,7 @@ public void initiate() {
             
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
                                          lh.ledgerId,
                                          BookieProtocol.LAST_ADD_CONFIRMED,
-                                         this, i);
+                                         this, i, BookieProtocol.FLAG_NONE);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 8777d445e..da6368556 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -127,6 +127,7 @@
     protected static final String MAX_PENDING_READ_REQUESTS_PER_THREAD = 
"maxPendingReadRequestsPerThread";
     protected static final String MAX_PENDING_ADD_REQUESTS_PER_THREAD = 
"maxPendingAddRequestsPerThread";
     protected static final String NUM_LONG_POLL_WORKER_THREADS = 
"numLongPollWorkerThreads";
+    protected static final String NUM_HIGH_PRIORITY_WORKER_THREADS = 
"numHighPriorityWorkerThreads";
 
     // Long poll parameters
     protected static final String REQUEST_TIMER_TICK_DURATION_MILLISEC = 
"requestTimerTickDurationMs";
@@ -1288,6 +1289,29 @@ public int getNumLongPollWorkerThreads() {
         return getInt(NUM_LONG_POLL_WORKER_THREADS, 10);
     }
 
+    /**
+     * Set the number of threads that should be used for high priority requests
+     * (i.e. recovery reads and adds, and fencing)
+     *
+     * @param numThreads
+     *          number of threads to handle high priority requests.
+     * @return server configuration
+     */
+    public ServerConfiguration setNumHighPriorityWorkerThreads(int numThreads) 
{
+        setProperty(NUM_HIGH_PRIORITY_WORKER_THREADS, numThreads);
+        return this;
+    }
+
+    /**
+     * Get the number of threads that should be used for high priority requests
+     * (i.e. recovery reads and adds, and fencing)
+     * @return
+     */
+    public int getNumHighPriorityWorkerThreads() {
+        return getInt(NUM_HIGH_PRIORITY_WORKER_THREADS, 8);
+    }
+
+
     /**
      * Set the number of threads that would handle read requests.
      *
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 1623324d8..0bd9fe8e1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -369,36 +369,6 @@ public void recycle() {
         }
     }
 
-    public void readEntryAndFenceLedger(final BookieSocketAddress addr,
-                                        final long ledgerId,
-                                        final byte[] masterKey,
-                                        final long entryId,
-                                        final ReadEntryCallback cb,
-                                        final Object ctx) {
-        closeLock.readLock().lock();
-        try {
-            final PerChannelBookieClientPool client = lookupClient(addr);
-            if (client == null) {
-                
completeRead(getRc(BKException.Code.BookieHandleNotAvailableException),
-                             ledgerId, entryId, null, cb, ctx);
-                return;
-            }
-
-            client.obtain(new GenericCallback<PerChannelBookieClient>() {
-                @Override
-                public void operationComplete(final int rc, 
PerChannelBookieClient pcbc) {
-                    if (rc != BKException.Code.OK) {
-                        completeRead(rc, ledgerId, entryId, null, cb, ctx);
-                        return;
-                    }
-                    pcbc.readEntryAndFenceLedger(ledgerId, masterKey, entryId, 
cb, ctx);
-                }
-            }, ledgerId);
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
     public void readLac(final BookieSocketAddress addr, final long ledgerId, 
final ReadLacCallback cb,
             final Object ctx) {
         closeLock.readLock().lock();
@@ -434,8 +404,13 @@ public void safeRun() {
         }
     }
 
+    public void readEntry(BookieSocketAddress addr, long ledgerId, long 
entryId,
+                          ReadEntryCallback cb, Object ctx, int flags) {
+        readEntry(addr, ledgerId, entryId, cb, ctx, flags, null);
+    }
+
     public void readEntry(final BookieSocketAddress addr, final long ledgerId, 
final long entryId,
-                          final ReadEntryCallback cb, final Object ctx) {
+                          final ReadEntryCallback cb, final Object ctx, int 
flags, byte[] masterKey) {
         closeLock.readLock().lock();
         try {
             final PerChannelBookieClientPool client = lookupClient(addr);
@@ -452,7 +427,7 @@ public void operationComplete(final int rc, 
PerChannelBookieClient pcbc) {
                         completeRead(rc, ledgerId, entryId, null, cb, ctx);
                         return;
                     }
-                    pcbc.readEntry(ledgerId, entryId, cb, ctx);
+                    pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, 
masterKey);
                 }
             }, ledgerId);
         } finally {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 375cabaa4..4594ef161 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -184,7 +184,7 @@ public Object decode(ByteBuf packet)
                     byte[] masterKey = readMasterKey(packet);
                     return new BookieProtocol.ReadRequest(version, ledgerId, 
entryId, flags, masterKey);
                 } else {
-                    return new BookieProtocol.ReadRequest(version, ledgerId, 
entryId, flags);
+                    return new BookieProtocol.ReadRequest(version, ledgerId, 
entryId, flags, null);
                 }
             case BookieProtocol.AUTH:
                 BookkeeperProtocol.AuthMessage.Builder builder = 
BookkeeperProtocol.AuthMessage.newBuilder();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 3136c57ec..9ae63163a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -182,6 +182,7 @@ public static short getFlags(int packetHeader) {
     short FLAG_NONE = 0x0;
     short FLAG_DO_FENCING = 0x0001;
     short FLAG_RECOVERY_ADD = 0x0002;
+    short FLAG_HIGH_PRIORITY = 0x0004;
 
     /**
      * A Bookie request object.
@@ -233,6 +234,10 @@ boolean hasMasterKey() {
             return masterKey;
         }
 
+        boolean isHighPriority() {
+            return (flags & FLAG_HIGH_PRIORITY) == FLAG_HIGH_PRIORITY;
+        }
+
         @Override
         public String toString() {
             return String.format("Op(%d)[Ledger:%d,Entry:%d]", opCode, 
ledgerId, entryId);
@@ -352,16 +357,12 @@ public void recycle() {
      * A Request that reads data.
      */
     class ReadRequest extends Request {
-        ReadRequest(byte protocolVersion, long ledgerId, long entryId, short 
flags) {
-            init(protocolVersion, READENTRY, ledgerId, entryId, flags, null);
-        }
-
         ReadRequest(byte protocolVersion, long ledgerId, long entryId,
                     short flags, byte[] masterKey) {
             init(protocolVersion, READENTRY, ledgerId, entryId, flags, 
masterKey);
         }
 
-        boolean isFencingRequest() {
+        boolean isFencing() {
             return (flags & FLAG_DO_FENCING) == FLAG_DO_FENCING;
         }
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 24a3925c6..94a43a956 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.proto;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
@@ -40,6 +41,7 @@
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
+import static org.apache.bookkeeper.proto.RequestUtils.hasFlag;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
@@ -109,6 +111,11 @@
      */
     private final OrderedSafeExecutor longPollThreadPool;
 
+    /**
+     * The threadpool used to execute high priority requests.
+     */
+    private final OrderedSafeExecutor highPriorityThreadPool;
+
     /**
      * The Timer used to time out requests for long polling.
      */
@@ -154,9 +161,12 @@ public BookieRequestProcessor(ServerConfiguration 
serverCfg, Bookie bookie,
                 statsLogger);
         this.longPollThreadPool = createExecutor(
                 this.serverCfg.getNumLongPollWorkerThreads(),
-                "BookieLongPollThread",
-                OrderedScheduler.NO_TASK_LIMIT,
-                statsLogger);
+                "BookieLongPollThread-" + serverCfg.getBookiePort(),
+                OrderedScheduler.NO_TASK_LIMIT, statsLogger);
+        this.highPriorityThreadPool = createExecutor(
+                this.serverCfg.getNumHighPriorityWorkerThreads(),
+                "BookieHighPriorityThread-" + serverCfg.getBookiePort(),
+                OrderedScheduler.NO_TASK_LIMIT, statsLogger);
         this.requestTimer = new HashedWheelTimer(
             new 
ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(),
             this.serverCfg.getRequestTimerTickDurationMs(),
@@ -195,6 +205,7 @@ public void close() {
         shutdownExecutor(writeThreadPool);
         shutdownExecutor(readThreadPool);
         shutdownExecutor(longPollThreadPool);
+        shutdownExecutor(highPriorityThreadPool);
     }
 
     private OrderedSafeExecutor createExecutor(
@@ -276,10 +287,12 @@ public void processRequest(Object msg, Channel c) {
             // process packet
             switch (r.getOpCode()) {
                 case BookieProtocol.ADDENTRY:
-                    processAddRequest(r, c);
+                    checkArgument(r instanceof 
BookieProtocol.ParsedAddRequest);
+                    processAddRequest((BookieProtocol.ParsedAddRequest) r, c);
                     break;
                 case BookieProtocol.READENTRY:
-                    processReadRequest(r, c);
+                    checkArgument(r instanceof BookieProtocol.ReadRequest);
+                    processReadRequest((BookieProtocol.ReadRequest) r, c);
                     break;
                 default:
                     LOG.error("Unknown op type {}, sending error", 
r.getOpCode());
@@ -312,15 +325,23 @@ private void processReadLacRequestV3(final 
BookkeeperProtocol.Request r, final C
 
     private void processAddRequestV3(final BookkeeperProtocol.Request r, final 
Channel c) {
         WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
-        if (null == writeThreadPool) {
+
+        final OrderedSafeExecutor threadPool;
+        if (RequestUtils.isHighPriority(r)) {
+            threadPool = highPriorityThreadPool;
+        } else {
+            threadPool = writeThreadPool;
+        }
+
+        if (null == threadPool) {
             write.run();
         } else {
             try {
-                writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), 
write);
+                threadPool.submitOrdered(r.getAddRequest().getLedgerId(), 
write);
             } catch (RejectedExecutionException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Failed to process request to add entry at 
{}:{}. Too many pending requests",
-                            r.getAddRequest().getLedgerId(), 
r.getAddRequest().getEntryId());
+                              r.getAddRequest().getLedgerId(), 
r.getAddRequest().getEntryId());
                 }
                 BookkeeperProtocol.AddResponse.Builder addResponse = 
BookkeeperProtocol.AddResponse.newBuilder()
                         .setLedgerId(r.getAddRequest().getLedgerId())
@@ -337,48 +358,51 @@ private void processAddRequestV3(final 
BookkeeperProtocol.Request r, final Chann
     }
 
     private void processReadRequestV3(final BookkeeperProtocol.Request r, 
final Channel c) {
-        ExecutorService fenceThreadPool =
-          null == readThreadPool ? null : readThreadPool.chooseThread(c);
-        ExecutorService lpThreadPool =
-          null == longPollThreadPool ? null : 
longPollThreadPool.chooseThread(c);
-        ReadEntryProcessorV3 read;
+        ExecutorService fenceThread = null == highPriorityThreadPool ? null : 
highPriorityThreadPool.chooseThread(c);
+
+        final ReadEntryProcessorV3 read;
+        final OrderedSafeExecutor threadPool;
         if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
-            read = new LongPollReadEntryProcessorV3(
-                r,
-                c,
-                this,
-                fenceThreadPool,
-                lpThreadPool,
-                requestTimer);
-            if (null == longPollThreadPool) {
-                read.run();
+            ExecutorService lpThread = null == longPollThreadPool ? null : 
longPollThreadPool.chooseThread(c);
+
+            read = new LongPollReadEntryProcessorV3(r, c, this, fenceThread,
+                                                    lpThread, requestTimer);
+            threadPool = longPollThreadPool;
+        } else {
+            read = new ReadEntryProcessorV3(r, c, this, fenceThread);
+
+            // If it's a high priority read (fencing or as part of recovery 
process), we want to make sure it
+            // gets executed as fast as possible, so bypass the normal 
readThreadPool
+            // and execute in highPriorityThreadPool
+            boolean isHighPriority = RequestUtils.isHighPriority(r)
+                || hasFlag(r.getReadRequest(), 
BookkeeperProtocol.ReadRequest.Flag.FENCE_LEDGER);
+            if (isHighPriority) {
+                threadPool = highPriorityThreadPool;
             } else {
-                
longPollThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
+                threadPool = readThreadPool;
             }
+        }
+
+        if (null == threadPool) {
+            read.run();
         } else {
-            read = new ReadEntryProcessorV3(r, c, this, fenceThreadPool);
-            if (null == readThreadPool) {
-                read.run();
-            } else {
-                try {
-                    
readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
-                } catch (RejectedExecutionException e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Failed to process request to read entry at 
{}:{}. Too many pending requests",
-                                r.getReadRequest().getLedgerId(), 
r.getReadRequest().getEntryId());
-                    }
-                    BookkeeperProtocol.ReadResponse.Builder readResponse =
-                            BookkeeperProtocol.ReadResponse.newBuilder()
-                                    
.setLedgerId(r.getAddRequest().getLedgerId())
-                                    .setEntryId(r.getAddRequest().getEntryId())
-                                    
.setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
-                    BookkeeperProtocol.Response.Builder response = 
BookkeeperProtocol.Response.newBuilder()
-                            .setHeader(read.getHeader())
-                            .setStatus(readResponse.getStatus())
-                            .setReadResponse(readResponse);
-                    BookkeeperProtocol.Response resp = response.build();
-                    read.sendResponse(readResponse.getStatus(), resp, 
readRequestStats);
+            try {
+                threadPool.submitOrdered(r.getReadRequest().getLedgerId(), 
read);
+            } catch (RejectedExecutionException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to process request to read entry at 
{}:{}. Too many pending requests",
+                              r.getReadRequest().getLedgerId(), 
r.getReadRequest().getEntryId());
                 }
+                BookkeeperProtocol.ReadResponse.Builder readResponse = 
BookkeeperProtocol.ReadResponse.newBuilder()
+                    .setLedgerId(r.getAddRequest().getLedgerId())
+                    .setEntryId(r.getAddRequest().getEntryId())
+                    .setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
+                BookkeeperProtocol.Response.Builder response = 
BookkeeperProtocol.Response.newBuilder()
+                    .setHeader(read.getHeader())
+                    .setStatus(readResponse.getStatus())
+                    .setReadResponse(readResponse);
+                BookkeeperProtocol.Response resp = response.build();
+                read.sendResponse(readResponse.getStatus(), resp, 
readRequestStats);
             }
         }
     }
@@ -435,13 +459,23 @@ private void processGetBookieInfoRequestV3(final 
BookkeeperProtocol.Request r, f
         }
     }
 
-    private void processAddRequest(final BookieProtocol.Request r, final 
Channel c) {
+    private void processAddRequest(final BookieProtocol.ParsedAddRequest r, 
final Channel c) {
         WriteEntryProcessor write = WriteEntryProcessor.create(r, c, this);
-        if (null == writeThreadPool) {
+
+        // If it's a high priority add (usually as part of recovery process), 
we want to make sure it gets
+        // executed as fast as possible, so bypass the normal writeThreadPool 
and execute in highPriorityThreadPool
+        final OrderedSafeExecutor threadPool;
+        if (r.isHighPriority()) {
+            threadPool = highPriorityThreadPool;
+        } else {
+            threadPool = writeThreadPool;
+        }
+
+        if (null == threadPool) {
             write.run();
         } else {
             try {
-                writeThreadPool.submitOrdered(r.getLedgerId(), write);
+                threadPool.submitOrdered(r.getLedgerId(), write);
             } catch (RejectedExecutionException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Failed to process request to add entry at 
{}:{}. Too many pending requests", r.ledgerId,
@@ -454,13 +488,24 @@ private void processAddRequest(final 
BookieProtocol.Request r, final Channel c)
         }
     }
 
-    private void processReadRequest(final BookieProtocol.Request r, final 
Channel c) {
+    private void processReadRequest(final BookieProtocol.ReadRequest r, final 
Channel c) {
         ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this);
-        if (null == readThreadPool) {
+
+        // If it's a high priority read (fencing or as part of recovery 
process), we want to make sure it
+        // gets executed as fast as possible, so bypass the normal 
readThreadPool
+        // and execute in highPriorityThreadPool
+        final OrderedSafeExecutor threadPool;
+        if (r.isHighPriority() || r.isFencing()) {
+            threadPool = highPriorityThreadPool;
+        } else {
+            threadPool = readThreadPool;
+        }
+
+        if (null == threadPool) {
             read.run();
         } else {
             try {
-                readThreadPool.submitOrdered(r.getLedgerId(), read);
+                threadPool.submitOrdered(r.getLedgerId(), read);
             } catch (RejectedExecutionException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Failed to process request to read entry at 
{}:{}. Too many pending requests", r.ledgerId,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index eda5b92a0..b7dee2d4a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -31,14 +31,14 @@
 /**
  * A base class for bookeeper packet processors.
  */
-abstract class PacketProcessorBase extends SafeRunnable {
+abstract class PacketProcessorBase<T extends Request> extends SafeRunnable {
     private static final Logger logger = 
LoggerFactory.getLogger(PacketProcessorBase.class);
-    Request request;
+    T request;
     Channel channel;
     BookieRequestProcessor requestProcessor;
     long enqueueNanos;
 
-    protected void init(Request request, Channel channel, 
BookieRequestProcessor requestProcessor) {
+    protected void init(T request, Channel channel, BookieRequestProcessor 
requestProcessor) {
         this.request = request;
         this.channel = channel;
         this.requestProcessor = requestProcessor;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 1173d77df..7735b982b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -144,7 +144,7 @@
                         BKException.Code.LedgerExistException,
                         BKException.Code.DuplicateEntryIdException,
                         BKException.Code.WriteOnReadOnlyBookieException));
-
+    private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add 
finer grained priority later.
     private static final AtomicLong txnIdGenerator = new AtomicLong(0);
 
     final BookieSocketAddress addr;
@@ -571,6 +571,9 @@ void addEntry(final long ledgerId, byte[] masterKey, final 
long entryId, ByteBuf
                     .setVersion(ProtocolVersion.VERSION_THREE)
                     .setOperation(OperationType.ADD_ENTRY)
                     .setTxnId(txnId);
+            if (((short) options & BookieProtocol.FLAG_HIGH_PRIORITY) == 
BookieProtocol.FLAG_HIGH_PRIORITY) {
+                headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
+            }
 
             byte[] toSendArray = toSend.toArray();
             AddRequest.Builder addBuilder = AddRequest.newBuilder()
@@ -583,6 +586,7 @@ void addEntry(final long ledgerId, byte[] masterKey, final 
long entryId, ByteBuf
                 addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
             }
 
+
             request = Request.newBuilder()
                     .setHeader(headerBuilder)
                     .setAddRequest(addBuilder)
@@ -604,55 +608,12 @@ void addEntry(final long ledgerId, byte[] masterKey, 
final long entryId, ByteBuf
         }
     }
 
-    public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey,
-                                        final long entryId,
-                                        ReadEntryCallback cb, Object ctx) {
-        Object request = null;
-        CompletionKey completionKey = null;
-        if (useV2WireProtocol) {
-            completionKey = acquireV2Key(ledgerId, entryId, 
OperationType.READ_ENTRY);
-            request = new 
BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 
entryId,
-                    BookieProtocol.FLAG_DO_FENCING, masterKey);
-        } else {
-            final long txnId = getTxnId();
-            completionKey = new V3CompletionKey(txnId, 
OperationType.READ_ENTRY);
-
-            // Build the request and calculate the total size to be included 
in the packet.
-            BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
-                    .setVersion(ProtocolVersion.VERSION_THREE)
-                    .setOperation(OperationType.READ_ENTRY)
-                    .setTxnId(txnId);
-
-            ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
-                    .setLedgerId(ledgerId)
-                    .setEntryId(entryId)
-                    .setMasterKey(ByteString.copyFrom(masterKey))
-                    .setFlag(ReadRequest.Flag.FENCE_LEDGER);
-
-            request = Request.newBuilder()
-                    .setHeader(headerBuilder)
-                    .setReadRequest(readBuilder)
-                    .build();
-        }
-
-        ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, 
ctx, ledgerId, entryId);
-        CompletionValue existingValue = 
completionObjects.putIfAbsent(completionKey, readCompletion);
-        if (existingValue != null) {
-            // There's a pending read request on same ledger/entry. Use the 
multimap to track all of them
-            synchronized (completionObjectsV2Conflicts) {
-                completionObjectsV2Conflicts.put(completionKey, 
readCompletion);
-            }
-        }
-
-        writeAndFlush(channel, completionKey, request);
-    }
-
     public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
             request = new 
BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                    ledgerId, 0, (short) 0);
+                                                     ledgerId, 0, (short) 0, 
null);
             completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC);
         } else {
             final long txnId = getTxnId();
@@ -686,7 +647,8 @@ public void readEntryWaitForLACUpdate(final long ledgerId,
                                           final boolean piggyBackEntry,
                                           ReadEntryCallback cb,
                                           Object ctx) {
-        readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis, 
piggyBackEntry, cb, ctx);
+        readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis,
+                          piggyBackEntry, cb, ctx, (short) 0, null);
     }
 
     /**
@@ -695,8 +657,11 @@ public void readEntryWaitForLACUpdate(final long ledgerId,
     public void readEntry(final long ledgerId,
                           final long entryId,
                           ReadEntryCallback cb,
-                          Object ctx) {
-        readEntryInternal(ledgerId, entryId, null, null, false, cb, ctx);
+                          Object ctx,
+                          int flags,
+                          byte[] masterKey) {
+        readEntryInternal(ledgerId, entryId, null, null, false,
+                          cb, ctx, (short) flags, masterKey);
     }
 
     private void readEntryInternal(final long ledgerId,
@@ -705,12 +670,14 @@ private void readEntryInternal(final long ledgerId,
                                    final Long timeOutInMillis,
                                    final boolean piggyBackEntry,
                                    final ReadEntryCallback cb,
-                                   final Object ctx) {
+                                   final Object ctx,
+                                   int flags,
+                                   byte[] masterKey) {
         Object request = null;
         CompletionKey completionKey = null;
         if (useV2WireProtocol) {
             request = new 
BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                    ledgerId, entryId, (short) 0);
+                    ledgerId, entryId, (short) flags, masterKey);
             completionKey = acquireV2Key(ledgerId, entryId, 
OperationType.READ_ENTRY);
         } else {
             final long txnId = getTxnId();
@@ -721,6 +688,9 @@ private void readEntryInternal(final long ledgerId,
                     .setVersion(ProtocolVersion.VERSION_THREE)
                     .setOperation(OperationType.READ_ENTRY)
                     .setTxnId(txnId);
+            if (((short) flags & BookieProtocol.FLAG_HIGH_PRIORITY) == 
BookieProtocol.FLAG_HIGH_PRIORITY) {
+                headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
+            }
 
             ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
                     .setLedgerId(ledgerId)
@@ -750,6 +720,17 @@ private void readEntryInternal(final long ledgerId,
                 readBuilder = 
readBuilder.setFlag(ReadRequest.Flag.ENTRY_PIGGYBACK);
             }
 
+            // Only one flag can be set on the read requests
+            if (((short) flags & BookieProtocol.FLAG_DO_FENCING) == 
BookieProtocol.FLAG_DO_FENCING) {
+                readBuilder.setFlag(ReadRequest.Flag.FENCE_LEDGER);
+                if (masterKey == null) {
+                    
cb.readEntryComplete(BKException.Code.IncorrectParameterException,
+                                         ledgerId, entryId, null, ctx);
+                    return;
+                }
+                readBuilder.setMasterKey(ByteString.copyFrom(masterKey));
+            }
+
             request = Request.newBuilder()
                     .setHeader(headerBuilder)
                     .setReadRequest(readBuilder)
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index ee1e930e6..2d4c54040 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -31,16 +31,16 @@
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class ReadEntryProcessor extends PacketProcessorBase {
+class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ReadEntryProcessor.class);
 
-    public static ReadEntryProcessor create(Request request, Channel channel,
-                              BookieRequestProcessor requestProcessor) {
+    public static ReadEntryProcessor create(ReadRequest request, Channel 
channel,
+                                            BookieRequestProcessor 
requestProcessor) {
         ReadEntryProcessor rep = RECYCLER.get();
         rep.init(request, channel, requestProcessor);
         return rep;
@@ -48,9 +48,6 @@ public static ReadEntryProcessor create(Request request, 
Channel channel,
 
     @Override
     protected void processPacket() {
-        assert (request instanceof BookieProtocol.ReadRequest);
-        BookieProtocol.ReadRequest read = (BookieProtocol.ReadRequest) request;
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received new read request: {}", request);
         }
@@ -59,13 +56,13 @@ protected void processPacket() {
         ByteBuf data = null;
         try {
             Future<Boolean> fenceResult = null;
-            if (read.isFencingRequest()) {
+            if (request.isFencing()) {
                 LOG.warn("Ledger: {}  fenced by: {}", request.getLedgerId(), 
channel.remoteAddress());
 
-                if (read.hasMasterKey()) {
-                    fenceResult = 
requestProcessor.bookie.fenceLedger(read.getLedgerId(), read.getMasterKey());
+                if (request.hasMasterKey()) {
+                    fenceResult = 
requestProcessor.bookie.fenceLedger(request.getLedgerId(), 
request.getMasterKey());
                 } else {
-                    LOG.error("Password not provided, Not safe to fence {}", 
read.getLedgerId());
+                    LOG.error("Password not provided, Not safe to fence {}", 
request.getLedgerId());
                     throw 
BookieException.create(BookieException.Code.UnauthorizedAccessException);
                 }
             }
@@ -94,17 +91,17 @@ protected void processPacket() {
                     }
                 } catch (InterruptedException ie) {
                     Thread.currentThread().interrupt();
-                    LOG.error("Interrupting fence read entry {}", read, ie);
+                    LOG.error("Interrupting fence read entry {}", request, ie);
                     errorCode = BookieProtocol.EIO;
                     data.release();
                     data = null;
                 } catch (ExecutionException ee) {
-                    LOG.error("Failed to fence read entry {}", read, ee);
+                    LOG.error("Failed to fence read entry {}", request, ee);
                     errorCode = BookieProtocol.EIO;
                     data.release();
                     data = null;
                 } catch (TimeoutException te) {
-                    LOG.error("Timeout to fence read entry {}", read, te);
+                    LOG.error("Timeout to fence read entry {}", request, te);
                     errorCode = BookieProtocol.EIO;
                     data.release();
                     data = null;
@@ -114,33 +111,35 @@ protected void processPacket() {
             }
         } catch (Bookie.NoLedgerException e) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Error reading {}", read, e);
+                LOG.debug("Error reading {}", request, e);
             }
             errorCode = BookieProtocol.ENOLEDGER;
         } catch (Bookie.NoEntryException e) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Error reading {}", read, e);
+                LOG.debug("Error reading {}", request, e);
             }
             errorCode = BookieProtocol.ENOENTRY;
         } catch (IOException e) {
-            LOG.error("Error reading {}", read, e);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Error reading {}", request, e);
+            }
             errorCode = BookieProtocol.EIO;
         } catch (BookieException e) {
-            LOG.error("Unauthorized access to ledger {}", read.getLedgerId(), 
e);
+            LOG.error("Unauthorized access to ledger {}", 
request.getLedgerId(), e);
             errorCode = BookieProtocol.EUA;
         } catch (Throwable t) {
-            LOG.error("Unexpected exception reading at {}:{} : {}", 
read.getLedgerId(), read.getEntryId(),
-                    t.getMessage(), t);
+            LOG.error("Unexpected exception reading at {}:{} : {}", 
request.getLedgerId(), request.getEntryId(),
+                      t.getMessage(), t);
             errorCode = BookieProtocol.EBADREQ;
         }
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Read entry rc = {} for {}", errorCode, read);
+            LOG.trace("Read entry rc = {} for {}", errorCode, request);
         }
         if (errorCode == BookieProtocol.EOK) {
             
requestProcessor.readEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
-            sendResponse(errorCode, ResponseBuilder.buildReadResponse(data, 
read),
+            sendResponse(errorCode, ResponseBuilder.buildReadResponse(data, 
request),
                          requestProcessor.readRequestStats);
 
         } else {
@@ -148,7 +147,7 @@ protected void processPacket() {
 
             
requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
-            sendResponse(errorCode, 
ResponseBuilder.buildErrorResponse(errorCode, read),
+            sendResponse(errorCode, 
ResponseBuilder.buildErrorResponse(errorCode, request),
                          requestProcessor.readRequestStats);
         }
         recycle();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
index c735bef26..87d40c7b6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
@@ -17,22 +17,33 @@
  */
 package org.apache.bookkeeper.proto;
 
-import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
-
 /**
  * Utilities for requests.
  */
 class RequestUtils {
 
-    public static boolean isFenceRequest(ReadRequest readRequest) {
-        return readRequest.hasFlag() && 
readRequest.getFlag().equals(ReadRequest.Flag.FENCE_LEDGER);
+    public static boolean isFenceRequest(BookkeeperProtocol.ReadRequest 
readRequest) {
+        return hasFlag(readRequest, 
BookkeeperProtocol.ReadRequest.Flag.FENCE_LEDGER);
     }
 
-    public static boolean isLongPollReadRequest(ReadRequest readRequest) {
+    public static boolean isLongPollReadRequest(BookkeeperProtocol.ReadRequest 
readRequest) {
         return !isFenceRequest(readRequest) && readRequest.hasPreviousLAC();
     }
 
-    public static boolean shouldPiggybackEntry(ReadRequest readRequest) {
-        return readRequest.hasFlag() && 
readRequest.getFlag().equals(ReadRequest.Flag.ENTRY_PIGGYBACK);
+    public static boolean isHighPriority(BookkeeperProtocol.Request request) {
+        return request.getHeader().getPriority() > 0;
+    }
+
+    public static boolean shouldPiggybackEntry(BookkeeperProtocol.ReadRequest 
readRequest) {
+        return hasFlag(readRequest, 
BookkeeperProtocol.ReadRequest.Flag.ENTRY_PIGGYBACK);
+    }
+
+    static boolean hasFlag(BookkeeperProtocol.ReadRequest request, 
BookkeeperProtocol.ReadRequest.Flag flag) {
+        return request.hasFlag() && request.getFlag() == flag;
+    }
+
+    static boolean hasFlag(BookkeeperProtocol.AddRequest request, 
BookkeeperProtocol.AddRequest.Flag flag) {
+        return request.hasFlag() && request.getFlag() == flag;
     }
+
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 164a11b42..993700092 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -27,7 +27,7 @@
 
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
@@ -36,7 +36,7 @@
 /**
  * Processes add entry requests.
  */
-class WriteEntryProcessor extends PacketProcessorBase implements WriteCallback 
{
+class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest> 
implements WriteCallback {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(WriteEntryProcessor.class);
 
@@ -47,8 +47,8 @@ protected void reset() {
         startTimeNanos = -1L;
     }
 
-    public static WriteEntryProcessor create(Request request, Channel channel,
-                               BookieRequestProcessor requestProcessor) {
+    public static WriteEntryProcessor create(ParsedAddRequest request, Channel 
channel,
+                                             BookieRequestProcessor 
requestProcessor) {
         WriteEntryProcessor wep = RECYCLER.get();
         wep.init(request, channel, requestProcessor);
         return wep;
@@ -56,40 +56,38 @@ public static WriteEntryProcessor create(Request request, 
Channel channel,
 
     @Override
     protected void processPacket() {
-        assert (request instanceof BookieProtocol.ParsedAddRequest);
-        BookieProtocol.ParsedAddRequest add = 
(BookieProtocol.ParsedAddRequest) request;
-
         if (requestProcessor.bookie.isReadOnly()) {
             LOG.warn("BookieServer is running in readonly mode,"
                     + " so rejecting the request from the client!");
             sendResponse(BookieProtocol.EREADONLY,
-                         
ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add),
+                         
ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, request),
                          requestProcessor.addRequestStats);
-            add.release();
-            add.recycle();
+            request.release();
+            request.recycle();
             return;
         }
 
         startTimeNanos = MathUtils.nowInNano();
         int rc = BookieProtocol.EOK;
-        ByteBuf addData = add.getData();
+        ByteBuf addData = request.getData();
         try {
-            if (add.isRecoveryAdd()) {
-                requestProcessor.bookie.recoveryAddEntry(addData, this, 
channel, add.getMasterKey());
+            if (request.isRecoveryAdd()) {
+                requestProcessor.bookie.recoveryAddEntry(addData, this, 
channel, request.getMasterKey());
             } else {
-                requestProcessor.bookie.addEntry(addData, false, this, 
channel, add.getMasterKey());
+                requestProcessor.bookie.addEntry(addData, false, this, 
channel, request.getMasterKey());
             }
         } catch (IOException e) {
-            LOG.error("Error writing " + add, e);
+            LOG.error("Error writing {}", request, e);
             rc = BookieProtocol.EIO;
         } catch (BookieException.LedgerFencedException lfe) {
             LOG.error("Attempt to write to fenced ledger", lfe);
             rc = BookieProtocol.EFENCED;
         } catch (BookieException e) {
-            LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e);
+            LOG.error("Unauthorized access to ledger {}", 
request.getLedgerId(), e);
             rc = BookieProtocol.EUA;
         } catch (Throwable t) {
-            LOG.error("Unexpected exception while writing {}@{} : {}", 
add.ledgerId, add.entryId, t.getMessage(), t);
+            LOG.error("Unexpected exception while writing {}@{} : {}",
+                      request.ledgerId, request.entryId, t.getMessage(), t);
             // some bad request which cause unexpected exception
             rc = BookieProtocol.EBADREQ;
         } finally {
@@ -100,9 +98,9 @@ protected void processPacket() {
             
requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
             sendResponse(rc,
-                         ResponseBuilder.buildErrorResponse(rc, add),
+                         ResponseBuilder.buildErrorResponse(rc, request),
                          requestProcessor.addRequestStats);
-            add.recycle();
+            request.recycle();
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index f227d8828..7bd78d3f4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -114,7 +114,7 @@ public void writeComplete(int rc, long ledgerId, long 
entryId,
         byte[] masterKey = addRequest.getMasterKey().toByteArray();
         ByteBuf entryToAdd = 
Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
         try {
-            if (addRequest.hasFlag() && 
addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) {
+            if (RequestUtils.hasFlag(addRequest, 
AddRequest.Flag.RECOVERY_ADD)) {
                 requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb, 
channel, masterKey);
             } else {
                 requestProcessor.bookie.addEntry(entryToAdd, ackBeforeSync, 
wcb, channel, masterKey);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index 0536ff804..0828341c8 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -45,6 +45,7 @@
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -526,7 +527,8 @@ private boolean verifyFullyReplicated(LedgerHandle lh, long 
untilEntry) throws E
             ReplicationVerificationCallback cb = new 
ReplicationVerificationCallback(numRequests);
             for (long i = startEntryId; i < endEntryId; i++) {
                 for (BookieSocketAddress addr : e.getValue()) {
-                    bkc.getBookieClient().readEntry(addr, lh.getId(), i, cb, 
addr);
+                    bkc.getBookieClient().readEntry(addr, lh.getId(), i,
+                                                    cb, addr, 
BookieProtocol.FLAG_NONE);
                 }
             }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index ffb85f58e..b7cc373d9 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -390,56 +390,14 @@ private void setupWriteLedgerMetadata() {
 
     @SuppressWarnings("unchecked")
     protected void setupBookieClientReadEntry() {
-        doAnswer(invokation -> {
-            Object[] args = invokation.getArguments();
-            BookkeeperInternalCallbacks.ReadEntryCallback callback =
-                (BookkeeperInternalCallbacks.ReadEntryCallback) args[4];
-            BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) 
args[0];
-            long ledgerId = (Long) args[1];
-            long entryId = (Long) args[3];
-
-            executor.submitOrdered(ledgerId, () -> {
-                DigestManager macManager = null;
-                try {
-                    macManager = getDigestType(ledgerId);
-                } catch (GeneralSecurityException gse){
-                    LOG.error("Initialize macManager fail", gse);
-                }
-                fencedLedgers.add(ledgerId);
-                MockEntry mockEntry = null;
-                try {
-                    mockEntry = getMockLedgerEntry(ledgerId, 
bookieSocketAddress, entryId);
-                } catch (BKException bke) {
-                    LOG.info("readEntryAndFenceLedger - occur BKException 
{}@{} at {}", entryId, ledgerId,
-                            bookieSocketAddress);
-                    callback.readEntryComplete(bke.getCode(), ledgerId, 
entryId, null, args[5]);
-                }
-                if (mockEntry != null) {
-                    LOG.info("readEntryAndFenceLedger - found mock entry {}@{} 
at {}", entryId, ledgerId,
-                            bookieSocketAddress);
-                    ByteBufList entry = 
macManager.computeDigestAndPackageForSending(entryId,
-                            mockEntry.lastAddConfirmed, 
mockEntry.payload.length,
-                            Unpooled.wrappedBuffer(mockEntry.payload));
-                    callback.readEntryComplete(BKException.Code.OK, ledgerId, 
entryId, ByteBufList.coalesce(entry),
-                            args[5]);
-                    entry.release();
-                } else {
-                    LOG.info("readEntryAndFenceLedger - no such mock entry 
{}@{} at {}", entryId, ledgerId,
-                            bookieSocketAddress);
-                    
callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId, 
entryId, null, args[5]);
-                }
-            });
-            return null;
-        }).when(bookieClient).readEntryAndFenceLedger(any(), anyLong(), any(), 
anyLong(),
-            any(BookkeeperInternalCallbacks.ReadEntryCallback.class), any());
-
-        doAnswer(invokation -> {
+        Answer<Void> answer = invokation -> {
             Object[] args = invokation.getArguments();
             BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) 
args[0];
             long ledgerId = (Long) args[1];
             long entryId = (Long) args[2];
             BookkeeperInternalCallbacks.ReadEntryCallback callback =
                 (BookkeeperInternalCallbacks.ReadEntryCallback) args[3];
+            boolean fenced = (((Integer) args[5]) & 
BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING;
 
             executor.submitOrdered(ledgerId, () -> {
                 DigestManager macManager = null;
@@ -456,6 +414,11 @@ protected void setupBookieClientReadEntry() {
                             bookieSocketAddress);
                     callback.readEntryComplete(bke.getCode(), ledgerId, 
entryId, null, args[5]);
                 }
+
+                if (fenced) {
+                    fencedLedgers.add(ledgerId);
+                }
+
                 if (mockEntry != null) {
                     LOG.info("readEntry - found mock entry {}@{} at {}", 
entryId, ledgerId, bookieSocketAddress);
                     ByteBufList entry = 
macManager.computeDigestAndPackageForSending(entryId,
@@ -470,8 +433,13 @@ protected void setupBookieClientReadEntry() {
                 }
             });
             return null;
-        }).when(bookieClient).readEntry(any(), anyLong(), anyLong(),
-            any(BookkeeperInternalCallbacks.ReadEntryCallback.class), any());
+        };
+        doAnswer(answer).when(bookieClient).readEntry(any(), anyLong(), 
anyLong(),
+                any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
+                any(), anyInt());
+        doAnswer(answer).when(bookieClient).readEntry(any(), anyLong(), 
anyLong(),
+                any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
+                any(), anyInt(), any());
     }
 
     private byte[] extractEntryPayload(long ledgerId, long entryId, 
ByteBufList toSend)
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
index 954df02c1..cf693ed0f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
@@ -106,7 +106,8 @@ private void verifyEntries(LedgerHandle lh, long 
startEntry, long untilEntry,
             ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
             VerificationCallback callback = new 
VerificationCallback(addresses.size());
             for (BookieSocketAddress addr : addresses) {
-                bkc.getBookieClient().readEntry(addr, lh.getId(), eid, 
callback, addr);
+                bkc.getBookieClient().readEntry(addr, lh.getId(), eid,
+                                                callback, addr, 0, null);
             }
             callback.latch.await();
             assertEquals(expectedSuccess, callback.numSuccess.get());
@@ -123,7 +124,8 @@ private void verifyEntriesRange(LedgerHandle lh, long 
startEntry, long untilEntr
             ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
             VerificationCallback callback = new 
VerificationCallback(addresses.size());
             for (BookieSocketAddress addr : addresses) {
-                bkc.getBookieClient().readEntry(addr, lh.getId(), eid, 
callback, addr);
+                bkc.getBookieClient().readEntry(addr, lh.getId(), eid,
+                                                callback, addr, 0, null);
             }
             callback.latch.await();
             assertTrue(expectedSuccess >= callback.numSuccess.get());
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
index 612e75477..cc70e024a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
@@ -19,9 +19,18 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
 
 import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -48,7 +57,7 @@ public TestMaxSizeWorkersQueue() {
         baseConf.setMaxPendingAddRequestPerThread(1);
     }
 
-    @Test(timeout = 60000)
+    @Test
     public void testReadRejected() throws Exception {
         LedgerHandle lh = bkc.createLedger(1, 1, digestType, new byte[0]);
         byte[] content = new byte[100];
@@ -91,7 +100,7 @@ public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq,
         assertEquals(BKException.Code.TooManyRequestsException, 
rcSecondReadOperation.get());
     }
 
-    @Test(timeout = 60000)
+    @Test
     public void testAddRejected() throws Exception {
         LedgerHandle lh = bkc.createLedger(1, 1, digestType, new byte[0]);
         byte[] content = new byte[100];
@@ -122,4 +131,60 @@ public void addComplete(int rc, LedgerHandle lh, long 
entryId, Object ctx) {
 
         assertTrue(receivedTooManyRequestsException.get());
     }
+
+    @Test
+    public void testRecoveryNotRejected() throws Exception {
+        LedgerHandle lh = bkc.createLedger(1, 1, digestType, new byte[0]);
+        byte[] content = new byte[100];
+
+        final int numEntriesToRead = 1000;
+        // Write few entries
+        for (int i = 0; i < numEntriesToRead; i++) {
+            lh.addEntry(content);
+        }
+
+        final int numLedgersToRecover = 10;
+        List<Long> ledgersToRecover = Lists.newArrayList();
+        for (int i = 0; i < numLedgersToRecover; i++) {
+            LedgerHandle lhr = bkc.createLedger(1, 1, digestType, new byte[0]);
+            lhr.addEntry(content);
+            // Leave the ledger in open state
+            ledgersToRecover.add(lhr.getId());
+        }
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final CyclicBarrier barrier = new CyclicBarrier(1 + 
numLedgersToRecover);
+
+        List<Future<?>> futures = Lists.newArrayList();
+        futures.add(executor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                barrier.await();
+                try {
+                    lh.readEntries(0, numEntriesToRead - 1);
+                    fail("Should have thrown exception");
+                } catch (Exception e) {
+                    // Expected
+                }
+                return null;
+            }
+        }));
+
+        for (long ledgerId : ledgersToRecover) {
+            futures.add(executor.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    barrier.await();
+
+                    // Recovery should always succeed
+                    bkc.openLedger(ledgerId, digestType, new byte[0]);
+                    return null;
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
index 00dbb4bd5..9e230b6af 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -166,7 +166,7 @@ public void testAuthFail() throws Exception {
         }
 
         client.sendRequest(new 
ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                                           1L, 1L, (short) 0));
+                                           1L, 1L, (short) 0, null));
         Response response = client.takeResponse();
         assertEquals("Should have failed",
                      response.getErrorCode(), BookieProtocol.EUA);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
new file mode 100644
index 000000000..c7601d0a9
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.protobuf.ByteString;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
+import org.junit.Test;
+
+/**
+ * Test utility methods from bookie request processor.
+ */
+public class TestBookieRequestProcessor {
+    @Test
+    public void testFlagsV3() {
+        ReadRequest read = ReadRequest.newBuilder()
+            .setLedgerId(10).setEntryId(1)
+            .setFlag(ReadRequest.Flag.FENCE_LEDGER).build();
+        assertTrue(RequestUtils.hasFlag(read, ReadRequest.Flag.FENCE_LEDGER));
+        assertFalse(RequestUtils.hasFlag(read, 
ReadRequest.Flag.ENTRY_PIGGYBACK));
+
+        read = ReadRequest.newBuilder()
+            .setLedgerId(10).setEntryId(1)
+            .setFlag(ReadRequest.Flag.ENTRY_PIGGYBACK).build();
+        assertFalse(RequestUtils.hasFlag(read, ReadRequest.Flag.FENCE_LEDGER));
+        assertTrue(RequestUtils.hasFlag(read, 
ReadRequest.Flag.ENTRY_PIGGYBACK));
+
+        read = ReadRequest.newBuilder()
+            .setLedgerId(10).setEntryId(1)
+            .build();
+        assertFalse(RequestUtils.hasFlag(read, ReadRequest.Flag.FENCE_LEDGER));
+        assertFalse(RequestUtils.hasFlag(read, 
ReadRequest.Flag.ENTRY_PIGGYBACK));
+
+        AddRequest add = AddRequest.newBuilder()
+            .setLedgerId(10).setEntryId(1)
+            .setFlag(AddRequest.Flag.RECOVERY_ADD)
+            .setMasterKey(ByteString.EMPTY)
+            .setBody(ByteString.EMPTY)
+            .build();
+        assertTrue(RequestUtils.hasFlag(add, AddRequest.Flag.RECOVERY_ADD));
+
+        add = AddRequest.newBuilder()
+            .setLedgerId(10).setEntryId(1)
+            .setMasterKey(ByteString.EMPTY)
+            .setBody(ByteString.EMPTY)
+            .build();
+        assertFalse(RequestUtils.hasFlag(add, AddRequest.Flag.RECOVERY_ADD));
+
+        add = AddRequest.newBuilder()
+            .setLedgerId(10).setEntryId(1)
+            .setFlag(AddRequest.Flag.RECOVERY_ADD)
+            .setMasterKey(ByteString.EMPTY)
+            .setBody(ByteString.EMPTY)
+            .build();
+        assertTrue(RequestUtils.hasFlag(add, AddRequest.Flag.RECOVERY_ADD));
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index d08dd8ad8..90788b1e7 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -277,7 +277,7 @@ public void safeRun() {
                     return;
                 }
 
-                client.readEntryAndFenceLedger(1, 
"00000111112222233333".getBytes(), 1, cb, null);
+                client.readEntry(1, 1, cb, null, 
BookieProtocol.FLAG_DO_FENCING, "00000111112222233333".getBytes());
             }
         });
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 4cf25912d..7ecce0ba9 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -162,7 +162,7 @@ public void testWriteGaps() throws Exception {
         synchronized (arc) {
             arc.wait(1000);
             assertEquals(0, arc.rc);
-            bc.readEntry(addr, 1, 1, recb, arc);
+            bc.readEntry(addr, 1, 1, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(1, arc.entry.getInt());
@@ -181,63 +181,63 @@ public void testWriteGaps() throws Exception {
             notifyObject.wait();
         }
         synchronized (arc) {
-            bc.readEntry(addr, 1, 6, recb, arc);
+            bc.readEntry(addr, 1, 6, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
         }
         synchronized (arc) {
-            bc.readEntry(addr, 1, 7, recb, arc);
+            bc.readEntry(addr, 1, 7, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(0, arc.rc);
-            assertEquals(7, arc.entry.getInt());
+            assertEquals(7, arc.entry.getInt(), BookieProtocol.FLAG_NONE);
         }
         synchronized (arc) {
-            bc.readEntry(addr, 1, 1, recb, arc);
+            bc.readEntry(addr, 1, 1, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(1, arc.entry.getInt());
         }
         synchronized (arc) {
-            bc.readEntry(addr, 1, 2, recb, arc);
+            bc.readEntry(addr, 1, 2, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(2, arc.entry.getInt());
         }
         synchronized (arc) {
-            bc.readEntry(addr, 1, 3, recb, arc);
+            bc.readEntry(addr, 1, 3, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(3, arc.entry.getInt());
         }
         synchronized (arc) {
-            bc.readEntry(addr, 1, 4, recb, arc);
+            bc.readEntry(addr, 1, 4, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
         }
         synchronized (arc) {
-            bc.readEntry(addr, 1, 11, recb, arc);
+            bc.readEntry(addr, 1, 11, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(11, arc.entry.getInt());
         }
         synchronized (arc) {
-            bc.readEntry(addr, 1, 5, recb, arc);
+            bc.readEntry(addr, 1, 5, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(0, arc.rc);
             assertEquals(5, arc.entry.getInt());
         }
         synchronized (arc) {
-            bc.readEntry(addr, 1, 10, recb, arc);
+            bc.readEntry(addr, 1, 10, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
         }
         synchronized (arc) {
-            bc.readEntry(addr, 1, 12, recb, arc);
+            bc.readEntry(addr, 1, 12, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
         }
         synchronized (arc) {
-            bc.readEntry(addr, 1, 13, recb, arc);
+            bc.readEntry(addr, 1, 13, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
         }
@@ -259,7 +259,7 @@ public void testNoLedger() throws Exception {
         BookieClient bc = new BookieClient(new ClientConfiguration(), 
eventLoopGroup, executor,
                                            scheduler, 
NullStatsLogger.INSTANCE);
         synchronized (arc) {
-            bc.readEntry(addr, 2, 13, recb, arc);
+            bc.readEntry(addr, 2, 13, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
             assertEquals(BKException.Code.NoSuchLedgerExistsException, arc.rc);
         }
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 60dace699..56f41fa57 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -143,7 +143,11 @@ journalDirectory=/tmp/bk-txn
 
 # Number of threads that should handle read requests. if zero, the reads would
 # be handled by netty threads directly.
-# numReadWorkerThreads=1
+# numReadWorkerThreads=8
+
+# Number of threads that should be used for high priority requests
+# (i.e. recovery reads and adds, and fencing).
+# numHighPriorityWorkerThreads=8
 
 # If read workers threads are enabled, limit the number of pending requests, to
 # avoid the executor queue to grow indefinitely
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 30882c876..94d20efd3 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -70,6 +70,17 @@ groups:
     description: The bookie authentication provider factory class name. If 
this is null, no authentication will take place.
     default: null
 
+- name: Worker thread settings
+  - param: numAddWorkerThreads
+    description: The number of threads that handle write requests. if zero, 
writes are handled by [Netty threads](http://netty.io/wiki/thread-model.html) 
directly.
+    default: 1
+  - param: numReadWorkerThreads
+    description: The number of threads that handle read requests. If zero, 
reads are handled by [Netty threads](http://netty.io/wiki/thread-model.html) 
directly.
+    default: 8
+  - param: numHighPriorityWorkerThreads
+    description: The number of threads that should be used for high priority 
requests (i.e. recovery reads and adds, and fencing). If zero, reads are 
handled by [Netty threads](http://netty.io/wiki/thread-model.html) directly.
+    default: 8
+
 - name: Garbage collection settings
   params:
   - param: gcWaitTime
@@ -78,12 +89,6 @@ groups:
   - param: gcOverreplicatedLedgerWaitTime
     description: How long the interval to trigger next garbage collection of 
overreplicated ledgers, in milliseconds. This should not be run very frequently 
since we read the metadata for all the ledgers on the bookie from zk.
     default: 86400000
-  - param: numAddWorkerThreads
-    description: The number of threads that handle write requests. if zero, 
writes are handled by [Netty threads](http://netty.io/wiki/thread-model.html) 
directly.
-    default: 1
-  - param: numReadWorkerThreads
-    description: The umber of threads that handle read requests. If 0, reads 
are handled by [Netty threads](http://netty.io/wiki/thread-model.html) directly.
-    default: 1
   - param: isForceGCAllowWhenNoSpace
     description: Whether force compaction is allowed when the disk is full or 
almost full. Forcing GC may get some space back, but may also fill up disk 
space more quickly. This is because new log files are created before GC, while 
old garbage log files are deleted after GC.
     default: 'false'
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
 
b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index 60d909600..9d39d3890 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -34,6 +34,7 @@
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
@@ -122,7 +123,8 @@ public void readEntryComplete(int rc, long lid, long eid, 
ByteBuf buffer, Object
         ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(eid);
         for (int i = 0; i < writeSet.size(); i++) {
             int idx = writeSet.get(i);
-            bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid, 
readEntryCallback, ensemble.get(idx));
+            bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid, 
readEntryCallback,
+                                   ensemble.get(idx), 
BookieProtocol.FLAG_NONE);
         }
     }
 
@@ -226,7 +228,8 @@ public void readLacs(final LedgerHandle lh, long eid,
         ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsemble(eid);
         for (int i = 0; i < writeSet.size(); i++) {
             int idx = writeSet.get(i);
-            bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid, 
readEntryCallback, ensemble.get(idx));
+            bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid, 
readEntryCallback,
+                                   ensemble.get(idx), 
BookieProtocol.FLAG_NONE);
         }
     }
 }
diff --git 
a/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatRecoveryNoPassword.groovy
 
b/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatRecoveryNoPassword.groovy
index d24a890b4..7540f7073 100644
--- 
a/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatRecoveryNoPassword.groovy
+++ 
b/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatRecoveryNoPassword.groovy
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.client.LedgerHandle
 import org.apache.bookkeeper.client.LedgerMetadata
 import org.apache.bookkeeper.conf.ClientConfiguration
 import org.apache.bookkeeper.net.BookieSocketAddress
+import org.apache.bookkeeper.proto.BookieProtocol
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
 import org.apache.bookkeeper.tests.BookKeeperClusterUtils
 import org.apache.bookkeeper.tests.DockerUtils
@@ -125,7 +126,7 @@ class TestCompatRecoveryNoPassword {
             for (long i = startEntryId; i < endEntryId; i++) {
                 for (BookieSocketAddress addr : e.getValue()) {
                     bookkeeper.getBookieClient()
-                        .readEntry(addr, lh.getId(), i, cb, addr)
+                        .readEntry(addr, lh.getId(), i, cb, addr, 
BookieProtocol.FLAG_NONE)
                 }
             }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to