This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new ee0ddde  BOOKKEEPER-772: Reorder Read Sequence
ee0ddde is described below

commit ee0dddee6849d1968500af666571df668d34393a
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue Jul 4 12:27:21 2017 +0800

    BOOKKEEPER-772: Reorder Read Sequence
    
    Descriptions of the changes in this PR:
    
        - for rackware placement policy, the bookie in the same rack will be 
preferred.
        - for region-aware placement policy, the bookie in the same region will 
be preferred.
        - for any readonly or unavailable (high score in bookie failure 
history) bookies, they will be at the last position in the sequence.
    
    This change is based on #220 . Please review gitsha 1f7dccd for the reorder 
change.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <None>, Matteo Merli <None>
    
    This closes #224 from sijie/reorder_reads
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |   7 +
 .../org/apache/bookkeeper/bookie/FileInfo.java     | 155 ++++---
 .../bookkeeper/bookie/IndexPersistenceMgr.java     |  30 +-
 .../bookie/InterleavedLedgerStorage.java           |  25 +-
 .../bookie/LastAddConfirmedUpdateNotification.java |  31 ++
 .../org/apache/bookkeeper/bookie/LedgerCache.java  |   6 +-
 .../apache/bookkeeper/bookie/LedgerCacheImpl.java  |   8 +
 .../apache/bookkeeper/bookie/LedgerDescriptor.java |   3 +
 .../bookkeeper/bookie/LedgerDescriptorImpl.java    |   9 +-
 .../apache/bookkeeper/bookie/LedgerEntryPage.java  |  33 +-
 .../apache/bookkeeper/bookie/LedgerStorage.java    |  14 +-
 .../bookkeeper/bookie/ShortReadException.java      |  37 ++
 .../apache/bookkeeper/client/AsyncCallback.java    |  16 +
 .../org/apache/bookkeeper/client/BookKeeper.java   |  22 +
 .../bookkeeper/client/BookKeeperClientStats.java   |   2 +
 .../org/apache/bookkeeper/client/LedgerHandle.java |  94 +++++
 .../apache/bookkeeper/client/PendingReadOp.java    |   8 +-
 ...eadOp.java => ReadLastConfirmedAndEntryOp.java} | 457 +++++++++++----------
 .../bookkeeper/conf/ClientConfiguration.java       | 163 +++++++-
 .../bookkeeper/conf/ServerConfiguration.java       |  65 +++
 .../org/apache/bookkeeper/proto/BookieClient.java  |  46 ++-
 .../bookkeeper/proto/BookieRequestProcessor.java   |  53 ++-
 .../proto/LongPollReadEntryProcessorV3.java        | 226 ++++++++++
 .../bookkeeper/proto/PerChannelBookieClient.java   |  68 ++-
 .../apache/bookkeeper/bookie/TestSyncThread.java   |   7 +
 .../client/TestReadLastConfirmedAndEntry.java      | 267 ++++++++++++
 .../client/TestReadLastConfirmedLongPoll.java      | 169 ++++++++
 .../org/apache/bookkeeper/meta/GcLedgersTest.java  |   7 +
 .../bookkeeper/meta/LedgerManagerTestCase.java     |  17 +-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  13 +
 30 files changed, 1744 insertions(+), 314 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 789cf33..d607b53 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -45,6 +45,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Observable;
+import java.util.Observer;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -1491,6 +1493,11 @@ public class Bookie extends BookieCriticalThread {
         LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
         return handle.getLastAddConfirmed();
     }
+    
+    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer) throws IOException {
+        LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
+        return handle.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+    }
 
     // The rest of the code is test stuff
     static class CounterCallback implements WriteCallback {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index 5aeb385..597fcd3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -23,22 +23,21 @@ package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Charsets.UTF_8;
 
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.Observable;
+import java.util.Observer;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
 /**
  * This is the file handle for a ledger's index file that maps entry ids to 
location.
  * It is used by LedgerCache.
@@ -57,7 +56,7 @@ import io.netty.buffer.Unpooled;
  * <b>Index page</b> is a fixed-length page, which contains serveral entries 
which point to the offsets of data stored in entry loggers.
  * </p>
  */
-class FileInfo {
+class FileInfo extends Observable {
     private final static Logger LOG = LoggerFactory.getLogger(FileInfo.class);
 
     static final int NO_MASTER_KEY = -1;
@@ -102,14 +101,34 @@ class FileInfo {
         return lac;
     }
 
-    synchronized long setLastAddConfirmed(long lac) {
-        if (null == this.lac || this.lac < lac) {
-            this.lac = lac;
+    long setLastAddConfirmed(long lac) {
+        long lacToReturn;
+        synchronized (this) {
+            if (null == this.lac || this.lac < lac) {
+                this.lac = lac;
+                setChanged();
+            }
+            lacToReturn = this.lac;
+        }
+        LOG.trace("Updating LAC {} , {}", lacToReturn, lac);
+
+
+        notifyObservers(new LastAddConfirmedUpdateNotification(lacToReturn));
+        return lacToReturn;
+    }
+
+    synchronized Observable waitForLastAddConfirmedUpdate(long previousLAC, 
Observer observe) {
+        if ((null != lac && lac > previousLAC)
+                || isClosed || ((stateBits & STATE_FENCED_BIT) == 
STATE_FENCED_BIT)) {
+            LOG.trace("Wait For LAC {} , {}", this.lac, previousLAC);
+            return null;
         }
-        return this.lac;
+
+        addObserver(observe);
+        return this;
     }
 
-    public File getLf() {
+    public synchronized File getLf() {
         return lf;
     }
 
@@ -170,15 +189,15 @@ class FileInfo {
             }
             bb.flip();
             if (bb.getInt() != signature) {
-                throw new IOException("Missing ledger signature");
+                throw new IOException("Missing ledger signature while reading 
header for " + lf);
             }
             int version = bb.getInt();
             if (version != headerVersion) {
-                throw new IOException("Incompatible ledger version " + 
version);
+                throw new IOException("Incompatible ledger version " + version 
+ " while reading header for " + lf);
             }
             int length = bb.getInt();
             if (length < 0) {
-                throw new IOException("Length " + length + " is invalid");
+                throw new IOException("Length " + length + " is invalid while 
reading header for " + lf);
             } else if (length > bb.remaining()) {
                 throw new BufferUnderflowException();
             }
@@ -187,11 +206,17 @@ class FileInfo {
             stateBits = bb.getInt();
             needFlushHeader = false;
         } else {
-            throw new IOException("Ledger index file does not exist");
+            throw new IOException("Ledger index file " + lf +" does not 
exist");
         }
     }
 
-    synchronized void checkOpen(boolean create) throws IOException {
+    @VisibleForTesting
+    void checkOpen(boolean create) throws IOException {
+        checkOpen(create, false);
+    }
+
+    private synchronized void checkOpen(boolean create, boolean 
openBeforeClose)
+            throws IOException {
         if (fc != null) {
             return;
         }
@@ -211,6 +236,10 @@ class FileInfo {
                 }
             }
         } else {
+            if (openBeforeClose) {
+                // if it is checking for close, skip reading header
+                return;
+            }
             try {
                 readHeader();
             } catch (BufferUnderflowException buf) {
@@ -246,19 +275,25 @@ class FileInfo {
      * @return true if set fence succeed, otherwise false when
      * it already fenced or failed to set fenced.
      */
-    synchronized public boolean setFenced() throws IOException {
-        checkOpen(false);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Try to set fenced state in file info {} : state bits 
{}.", lf, stateBits);
-        }
-        if ((stateBits & STATE_FENCED_BIT) != STATE_FENCED_BIT) {
-            // not fenced yet
-            stateBits |= STATE_FENCED_BIT;
-            needFlushHeader = true;
-            return true;
-        } else {
-            return false;
+    public boolean setFenced() throws IOException {
+        boolean returnVal = false;
+        synchronized (this) {
+            checkOpen(false);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Try to set fenced state in file info {} : state 
bits {}.", lf, stateBits);
+            }
+            if ((stateBits & STATE_FENCED_BIT) != STATE_FENCED_BIT) {
+                // not fenced yet
+                stateBits |= STATE_FENCED_BIT;
+                needFlushHeader = true;
+                synchronized (this) {
+                    setChanged();
+                }
+                returnVal = true;
+            }
         }
+        notifyObservers(new 
LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
+        return returnVal;
     }
 
     // flush the header when header is changed
@@ -279,11 +314,28 @@ class FileInfo {
         return rc;
     }
 
-    public int read(ByteBuffer bb, long position) throws IOException {
-        return readAbsolute(bb, position + START_OF_DATA);
+    public int read(ByteBuffer bb, long position, boolean bestEffort)
+            throws IOException {
+        return readAbsolute(bb, position + START_OF_DATA, bestEffort);
     }
 
-    private int readAbsolute(ByteBuffer bb, long start) throws IOException {
+    /**
+     * Read data from position <i>start</i> to fill the byte buffer <i>bb</i>.
+     * If <i>bestEffort </i> is provided, it would return when it reaches EOF.
+     * Otherwise, it would throw {@link 
org.apache.bookkeeper.bookie.ShortReadException}
+     * if it reaches EOF.
+     *
+     * @param bb
+     *          byte buffer of data
+     * @param start
+     *          start position to read data
+     * @param bestEffort
+     *          flag indicates if it is a best-effort read
+     * @return number of bytes read
+     * @throws IOException
+     */
+    private int readAbsolute(ByteBuffer bb, long start, boolean bestEffort)
+            throws IOException {
         checkOpen(false);
         synchronized (this) {
             if (fc == null) {
@@ -297,7 +349,11 @@ class FileInfo {
                 rc = fc.read(bb, start);
             }
             if (rc <= 0) {
-                throw new IOException("Short read");
+                if (bestEffort) {
+                    return total;
+                } else {
+                    throw new ShortReadException("Short read at " + 
getLf().getPath() + "@" + start);
+                }
             }
             total += rc;
             // should move read position
@@ -307,23 +363,30 @@ class FileInfo {
     }
 
     /**
-     * Close a file info
+     * Close a file info. Generally, force should be set to true. If set to 
false metadata will not be flushed and
+     * accessing metadata before restart and recovery will be unsafe (since 
reloading from the index file will
+     * cause metadata to be lost). Setting force=false helps avoid expensive 
file create during shutdown with many
+     * dirty ledgers, and is safe because ledger metadata will be recovered 
before being accessed again.
      *
      * @param force
      *          if set to true, the index is forced to create before closed,
      *          if set to false, the index is not forced to create.
      */
-    synchronized public void close(boolean force) throws IOException {
-        isClosed = true;
-        checkOpen(force);
-        // Any time when we force close a file, we should try to flush header. 
otherwise, we might lose fence bit.
-        if (force) {
-            flushHeader();
-        }
-        if (useCount.get() == 0 && fc != null) {
-            fc.close();
-            fc = null;
+    public void close(boolean force) throws IOException {
+        synchronized (this) {
+            isClosed = true;
+            checkOpen(force, true);
+            // Any time when we force close a file, we should try to flush 
header. otherwise, we might lose fence bit.
+            if (force) {
+                flushHeader();
+            }
+            setChanged();
+            if (useCount.get() == 0 && fc != null) {
+                fc.close();
+                fc = null;
+            }
         }
+        notifyObservers(new 
LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
     }
 
     synchronized public long write(ByteBuffer[] buffs, long position) throws 
IOException {
@@ -429,7 +492,7 @@ class FileInfo {
         }
     }
 
-    public boolean delete() {
+    public synchronized boolean delete() {
         return lf.delete();
     }
 
@@ -443,7 +506,7 @@ class FileInfo {
         }
     }
 
-    public boolean isSameFile(File f) {
+    public synchronized boolean isSameFile(File f) {
         return this.lf.equals(f);
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 81c37fd..323ad62 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -28,9 +30,10 @@ import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Observable;
+import java.util.Observer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -41,10 +44,6 @@ import org.apache.bookkeeper.util.SnapshotMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import io.netty.buffer.ByteBuf;
-
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_NUM_EVICTED_LEDGERS;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OPEN_LEDGERS;
 
@@ -333,6 +332,18 @@ public class IndexPersistenceMgr {
             }
         }
     }
+    
+    Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, 
Observer observer) throws IOException {
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            return fi.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+        } finally {
+            if (null != fi) {
+                fi.release();
+            }
+        }
+    }
 
     long updateLastAddConfirmed(long ledgerId, long lac) throws IOException {
         FileInfo fi = null;
@@ -626,7 +637,14 @@ public class IndexPersistenceMgr {
                 if (position < 0) {
                     position = 0;
                 }
-                fi.read(bb, position);
+                // we read the last page from file size minus page size, so it 
should not encounter short read
+                // exception. if it does, it is an unexpected situation, then 
throw the exception and fail it immediately.
+                try {
+                    fi.read(bb, position, false);
+                } catch (ShortReadException sre) {
+                    // throw a more meaningful exception with ledger id
+                    throw new ShortReadException("Short read on ledger " + 
ledgerId + " : ", sre);
+                }
                 bb.flip();
                 long startingEntryId = position / 
LedgerEntryPage.getIndexEntrySize();
                 for (int i = entriesPerPage - 1; i >= 0; i--) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index b8a6e53..3d84877 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -21,24 +21,23 @@
 
 package org.apache.bookkeeper.bookie;
 
+import com.google.common.collect.Lists;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
-
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-
-import java.util.Map;
-import java.util.NavigableMap;
-
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -49,8 +48,6 @@ import org.apache.bookkeeper.util.SnapshotMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
 
@@ -264,6 +261,12 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
     }
 
     @Override
+    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer) throws IOException {
+        return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, 
previoisLAC, observer);
+    }
+
+
+    @Override
     synchronized public long addEntry(ByteBuf entry) throws IOException {
         long ledgerId = entry.getLong(entry.readerIndex() + 0);
         long entryId = entry.getLong(entry.readerIndex() + 8);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
new file mode 100644
index 0000000..81cd842
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+public class LastAddConfirmedUpdateNotification {
+    public long lastAddConfirmed;
+    public long timestamp;
+
+    public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
+        this.lastAddConfirmed = lastAddConfirmed;
+        this.timestamp = System.currentTimeMillis();
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index c55592f..26d5245 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -21,10 +21,11 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBuf;
 import java.io.Closeable;
 import java.io.IOException;
-
-import io.netty.buffer.ByteBuf;
+import java.util.Observable;
+import java.util.Observer;
 
 /**
  * This class maps a ledger entry number into a location (entrylogid, offset) 
in
@@ -48,6 +49,7 @@ interface LedgerCache extends Closeable {
 
     Long getLastAddConfirmed(long ledgerId) throws IOException;
     long updateLastAddConfirmed(long ledgerId, long lac) throws IOException;
+    Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, 
Observer observer) throws IOException;
 
     void deleteLedger(long ledgerId) throws IOException;
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index 8f1c56f..5709ce6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
 
+import java.util.Observable;
+import java.util.Observer;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -85,6 +87,12 @@ public class LedgerCacheImpl implements LedgerCache {
     }
 
     @Override
+    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer) throws IOException {
+        return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId, 
previoisLAC, observer);
+    }
+
+
+    @Override
     public void putEntryOffset(long ledger, long entry, long offset) throws 
IOException {
         indexPageManager.putEntryOffset(ledger, entry, offset);
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 9fe1629..032dfe2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.bookie;
 import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
+import java.util.Observable;
+import java.util.Observer;
 
 /**
  * Implements a ledger inside a bookie. In particular, it implements operations
@@ -58,6 +60,7 @@ public abstract class LedgerDescriptor {
     abstract ByteBuf readEntry(long entryId) throws IOException;
 
     abstract long getLastAddConfirmed() throws IOException;
+    abstract Observable waitForLastAddConfirmedUpdate(long previoisLAC, 
Observer observer) throws IOException;
 
     abstract void setExplicitLac(ByteBuf entry) throws IOException;
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index a1e0fc0..c2246bf 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -22,10 +22,10 @@
 package org.apache.bookkeeper.bookie;
 
 import io.netty.buffer.ByteBuf;
-
 import java.io.IOException;
 import java.util.Arrays;
-
+import java.util.Observable;
+import java.util.Observer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,4 +101,9 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
     long getLastAddConfirmed() throws IOException {
         return ledgerStorage.getLastAddConfirmed(ledgerId);
     }
+
+    @Override
+    Observable waitForLastAddConfirmedUpdate(long previoisLAC, Observer 
observer) throws IOException {
+        return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, 
previoisLAC, observer);
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
index 2d6f80d..5aee2fe 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
@@ -21,18 +21,22 @@
 
 package org.apache.bookkeeper.bookie;
 
-import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.util.ZeroBuffer;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.util.ZeroBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a page in the LedgerCache. It holds the locations
  * (entrylogfile, offset) for entry ids.
  */
 public class LedgerEntryPage {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LedgerEntryPage.class);
+
     private final static int indexEntrySize = 8;
     private final int pageSize;
     private final int entriesPerPage;
@@ -153,11 +157,24 @@ public class LedgerEntryPage {
     public void readPage(FileInfo fi) throws IOException {
         checkPage();
         page.clear();
-        while(page.remaining() != 0) {
-            if (fi.read(page, getFirstEntryPosition()) <= 0) {
-                throw new IOException("Short page read of ledger " + 
getLedger()
-                                + " tried to get " + page.capacity() + " from 
position " + getFirstEntryPosition()
-                                + " still need " + page.remaining());
+        try {
+            fi.read(page, getFirstEntryPosition(), true);
+        } catch (ShortReadException sre) {
+            throw new ShortReadException("Short page read of ledger " + 
getLedger()
+                    + " tried to get " + page.capacity() + " from position "
+                    + getFirstEntryPosition() + " still need " + 
page.remaining(), sre);
+        } catch (IllegalArgumentException iae) {
+            LOG.error("IllegalArgumentException when trying to read ledger {} 
from position {}"
+                , new Object[]{getLedger(), getFirstEntryPosition(), iae});
+            throw iae;
+        }
+        // make sure we don't include partial index entry
+        if (page.remaining() != 0) {
+            LOG.info("Short page read of ledger {} : tried to read {} bytes 
from position {}, but only {} bytes read.",
+                     new Object[] { getLedger(), page.capacity(), 
getFirstEntryPosition(), page.position() });
+            if (page.position() % indexEntrySize != 0) {
+                int partialIndexEntryStart = page.position() - page.position() 
% indexEntrySize;
+                page.putLong(partialIndexEntryStart, 0L);
             }
         }
         last = getLastEntryIndex();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 9d2161e..9fc9390 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -22,9 +22,9 @@
 package org.apache.bookkeeper.bookie;
 
 import io.netty.buffer.ByteBuf;
-
 import java.io.IOException;
-
+import java.util.Observable;
+import java.util.Observer;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -118,6 +118,16 @@ public interface LedgerStorage {
     long getLastAddConfirmed(long ledgerId) throws IOException;
 
     /**
+     * Wait for last add confirmed update.
+     *
+     * @param previoisLAC - The threshold beyond which we would wait for the 
update
+     * @param observer  - Observer to notify on update
+     * @return
+     * @throws IOException
+     */
+    Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, 
Observer observer) throws IOException;
+
+    /**
      * Flushes all data in the storage. Once this is called,
      * add data written to the LedgerStorage up until this point
      * has been persisted to perminant storage
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ShortReadException.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ShortReadException.java
new file mode 100644
index 0000000..302cc46
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ShortReadException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+
+/**
+ * Short Read Exception. Used to distinguish short read exception with other 
{@link java.io.IOException}s.
+ */
+public class ShortReadException extends IOException {
+
+    private static final long serialVersionUID = -4201771547564923223L;
+
+    public ShortReadException(String msg) {
+        super(msg);
+    }
+
+    public ShortReadException(String msg, Throwable t) {
+        super(msg, t);
+    }
+
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
index 05067d0..8f5bdea 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
@@ -139,6 +139,22 @@ public interface AsyncCallback {
         void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx);
     }
 
+    public interface ReadLastConfirmedAndEntryCallback {
+        /**
+         * Callback definition for bookie operation that allows reading the 
last add confirmed
+         * along with an entry within the last add confirmed range
+         *
+         * @param rc Return code
+         * @param lastConfirmed The entry id of the last confirmed write or
+         *                      {@link LedgerHandle#INVALID_ENTRY_ID 
INVALID_ENTRY_ID}
+         *                      if no entry has been confirmed
+         * @param entry The entry since the lastAddConfirmed entry that was 
specified when the request
+         *              was initiated
+         * @param ctx context object
+         */
+        void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, 
LedgerEntry entry, Object ctx);
+    }
+
     public interface RecoverCallback {
         /**
          * Callback definition for bookie recover operations
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 383fe3f..7895af2 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -99,6 +99,8 @@ public class BookKeeper implements AutoCloseable {
     private OpStatsLogger deleteOpLogger;
     private OpStatsLogger recoverOpLogger;
     private OpStatsLogger readOpLogger;
+    private OpStatsLogger readLacAndEntryOpLogger;
+    private OpStatsLogger readLacAndEntryRespLogger;
     private OpStatsLogger addOpLogger;
     private OpStatsLogger writeLacOpLogger;
     private OpStatsLogger readLacOpLogger;
@@ -137,8 +139,10 @@ public class BookKeeper implements AutoCloseable {
     final ClientConfiguration conf;
     final int explicitLacInterval;
     final boolean delayEnsembleChange;
+    final boolean reorderReadSequence;
 
     final Optional<SpeculativeRequestExecutionPolicy> 
readSpeculativeRequestPolicy;
+    final Optional<SpeculativeRequestExecutionPolicy> 
readLACSpeculativeRequestPolicy;
 
     // Close State
     boolean closed = false;
@@ -301,6 +305,7 @@ public class BookKeeper implements AutoCloseable {
             throws IOException, InterruptedException, KeeperException {
         this.conf = conf;
         this.delayEnsembleChange = conf.getDelayEnsembleChange();
+        this.reorderReadSequence = conf.isReorderReadSequenceEnabled();
 
         // initialize zookeeper client
         if (zkc == null) {
@@ -374,6 +379,15 @@ public class BookKeeper implements AutoCloseable {
             this.readSpeculativeRequestPolicy = 
Optional.<SpeculativeRequestExecutionPolicy>absent();
         }
 
+        if (conf.getFirstSpeculativeReadLACTimeout() > 0) {
+            this.readLACSpeculativeRequestPolicy =
+                    Optional.of((SpeculativeRequestExecutionPolicy)(new 
DefaultSpeculativeRequestExecutionPolicy(
+                        conf.getFirstSpeculativeReadLACTimeout(),
+                        conf.getMaxSpeculativeReadLACTimeout(),
+                        
conf.getSpeculativeReadLACTimeoutBackoffMultiplier())));
+        } else {
+            this.readLACSpeculativeRequestPolicy = 
Optional.<SpeculativeRequestExecutionPolicy>absent();
+        }
         // initialize main worker pool
         this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
                 .name("BookKeeperClientWorker")
@@ -504,6 +518,10 @@ public class BookKeeper implements AutoCloseable {
         return readSpeculativeRequestPolicy;
     }
 
+    public Optional<SpeculativeRequestExecutionPolicy> 
getReadLACSpeculativeRequestPolicy() {
+        return readLACSpeculativeRequestPolicy;
+    }
+
     /**
      * Get the BookieClient, currently used for doing bookie recovery.
      *
@@ -1275,6 +1293,8 @@ public class BookKeeper implements AutoCloseable {
         openOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.OPEN_OP);
         recoverOpLogger = 
stats.getOpStatsLogger(BookKeeperClientStats.RECOVER_OP);
         readOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_OP);
+        readLacAndEntryOpLogger = 
stats.getOpStatsLogger(BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY);
+        readLacAndEntryRespLogger = 
stats.getOpStatsLogger(BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE);
         addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
         writeLacOpLogger = 
stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
         readLacOpLogger = 
stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
@@ -1287,6 +1307,8 @@ public class BookKeeper implements AutoCloseable {
     OpStatsLogger getDeleteOpLogger() { return deleteOpLogger; }
     OpStatsLogger getRecoverOpLogger() { return recoverOpLogger; }
     OpStatsLogger getReadOpLogger() { return readOpLogger; }
+    OpStatsLogger getReadLacAndEntryOpLogger() { return 
readLacAndEntryOpLogger; }
+    OpStatsLogger getReadLacAndEntryRespLogger() { return 
readLacAndEntryRespLogger; }
     OpStatsLogger getAddOpLogger() { return addOpLogger; }
     OpStatsLogger getWriteLacOpLogger() { return writeLacOpLogger; }
     OpStatsLogger getReadLacOpLogger() { return readLacOpLogger; }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index e98b2d6..15c6248 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -39,6 +39,8 @@ public interface BookKeeperClientStats {
     public final static String READ_OP = "READ_ENTRY";
     public final static String WRITE_LAC_OP = "WRITE_LAC";
     public final static String READ_LAC_OP = "READ_LAC";
+    public final static String READ_LAST_CONFIRMED_AND_ENTRY = 
"READ_LAST_CONFIRMED_AND_ENTRY";
+    public final static String READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE = 
"READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE";
     public final static String PENDING_ADDS = "NUM_PENDING_ADD";
     public final static String ENSEMBLE_CHANGES = "NUM_ENSEMBLE_CHANGE";
     public final static String LAC_UPDATE_HITS = "LAC_UPDATE_HITS";
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index bd6c7d6..a056043 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -22,6 +22,9 @@ package org.apache.bookkeeper.client;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
 import io.netty.buffer.ByteBuf;
@@ -39,6 +42,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -78,6 +82,7 @@ public class LedgerHandle implements AutoCloseable {
     final DigestManager macManager;
     final DistributionSchedule distributionSchedule;
     final RateLimiter throttler;
+    final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
     final boolean enableParallelRecoveryRead;
     final int recoveryReadBatchSize;
 
@@ -138,6 +143,13 @@ public class LedgerHandle implements AutoCloseable {
         this.ledgerKey = password.length > 0 ? 
MacDigestManager.genDigest("ledger", password) : emptyLedgerKey;
         distributionSchedule = new RoundRobinDistributionSchedule(
                 metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(), 
metadata.getEnsembleSize());
+        this.bookieFailureHistory = CacheBuilder.newBuilder()
+            
.expireAfterWrite(bk.getConf().getBookieFailureHistoryExpirationMSec(), 
TimeUnit.MILLISECONDS)
+            .build(new CacheLoader<BookieSocketAddress, Long>() {
+            public Long load(BookieSocketAddress key) {
+                return -1L;
+            }
+        });
 
         ensembleChangeCounter = 
bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
         lacUpdateHitsCounter = 
bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
@@ -957,6 +969,81 @@ public class LedgerHandle implements AutoCloseable {
         new TryReadLastConfirmedOp(this, innercb, 
getLastAddConfirmed()).initiate();
     }
 
+
+    /**
+     * Asynchronous read next entry and the latest last add confirmed.
+     * If the next entryId is less than known last add confirmed, the call 
will read next entry directly.
+     * If the next entryId is ahead of known last add confirmed, the call will 
issue a long poll read
+     * to wait for the next entry <i>entryId</i>.
+     *
+     * The callback will return the latest last add confirmed and next entry 
if it is available within timeout period <i>timeOutInMillis</i>.
+     *
+     * @param entryId
+     *          next entry id to read
+     * @param timeOutInMillis
+     *          timeout period to wait for the entry id to be available (for 
long poll only)
+     * @param parallel
+     *          whether to issue the long poll reads in parallel
+     * @param cb
+     *          callback to return the result
+     * @param ctx
+     *          callback context
+     */
+    public void asyncReadLastConfirmedAndEntry(final long entryId,
+                                               final long timeOutInMillis,
+                                               final boolean parallel,
+                                               final 
AsyncCallback.ReadLastConfirmedAndEntryCallback cb,
+                                               final Object ctx) {
+        boolean isClosed;
+        long lac;
+        synchronized (this) {
+            isClosed = metadata.isClosed();
+            lac = metadata.getLastEntryId();
+        }
+        if (isClosed) {
+            if (entryId > lac) {
+                cb.readLastConfirmedAndEntryComplete(BKException.Code.OK, lac, 
null, ctx);
+                return;
+            }
+        } else {
+            lac = getLastAddConfirmed();
+        }
+        if (entryId <= lac) {
+            asyncReadEntries(entryId, entryId, new ReadCallback() {
+                @Override
+                public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object ctx) {
+                    if (BKException.Code.OK == rc) {
+                        if (seq.hasMoreElements()) {
+                            cb.readLastConfirmedAndEntryComplete(rc, 
getLastAddConfirmed(), seq.nextElement(), ctx);
+                        } else {
+                            cb.readLastConfirmedAndEntryComplete(rc, 
getLastAddConfirmed(), null, ctx);
+                        }
+                    } else {
+                        cb.readLastConfirmedAndEntryComplete(rc, 
INVALID_ENTRY_ID, null, ctx);
+                    }
+                }
+            }, ctx);
+            return;
+        }
+        // wait for entry <i>entryId</i>
+        ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb = 
new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() {
+            AtomicBoolean completed = new AtomicBoolean(false);
+            @Override
+            public void readLastConfirmedAndEntryComplete(int rc, long 
lastAddConfirmed, LedgerEntry entry) {
+                if (rc == BKException.Code.OK) {
+                    if (completed.compareAndSet(false, true)) {
+                        cb.readLastConfirmedAndEntryComplete(rc, 
lastAddConfirmed, entry, ctx);
+                    }
+                } else {
+                    if (completed.compareAndSet(false, true)) {
+                        cb.readLastConfirmedAndEntryComplete(rc, 
INVALID_ENTRY_ID, null, ctx);
+                    }
+                }
+            }
+        };
+        new ReadLastConfirmedAndEntryOp(this, innercb, entryId - 1, 
timeOutInMillis, bk.scheduler).parallelRead(parallel).initiate();
+    }
+
     /**
      * Context objects for synchronous call to read last confirmed.
      */
@@ -1545,6 +1632,13 @@ public class LedgerHandle implements AutoCloseable {
         bk.getLedgerManager().readLedgerMetadata(ledgerId, cb);
     }
 
+    void registerOperationFailureOnBookie(BookieSocketAddress bookie, long 
entryId) {
+        if (bk.getConf().getEnableBookieFailureTracking()) {
+            bookieFailureHistory.put(bookie, entryId);
+        }
+    }
+
+
     void recover(GenericCallback<Void> finalCb) {
         recover(finalCb, null, false);
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index f2477c1..c820122 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -92,7 +92,13 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
             super(lId, eId);
 
             this.ensemble = ensemble;
-            this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+
+            if (lh.bk.reorderReadSequence) {
+                this.writeSet = 
lh.bk.placementPolicy.reorderReadSequence(ensemble,
+                    lh.distributionSchedule.getWriteSet(entryId), 
lh.bookieFailureHistory.asMap());
+            } else {
+                this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+            }
         }
 
         /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
similarity index 51%
copy from 
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
copy to 
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index f2477c1..2fab694 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -1,4 +1,4 @@
-/*
+/**
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,63 +21,47 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
-
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Enumeration;
-import java.util.HashSet;
 import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
-import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Sequence of entries of a ledger that represents a pending read operation.
- * When all the data read has come back, the application callback is called.
- * This class could be improved because we could start pushing data to the
- * application as soon as it arrives rather than waiting for the whole thing.
- *
- */
-class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
-    private static final Logger LOG = 
LoggerFactory.getLogger(PendingReadOp.class);
+public class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEntryCallback, SpeculativeRequestExectuor {
+    static final Logger LOG = 
LoggerFactory.getLogger(ReadLastConfirmedAndEntryOp.class);
 
     final private ScheduledExecutorService scheduler;
-    private ScheduledFuture<?> speculativeTask = null;
-    Queue<LedgerEntryRequest> seq;
-    Set<BookieSocketAddress> heardFromHosts;
-    BitSet heardFromHostsBitSet;
-    ReadCallback cb;
-    Object ctx;
-    LedgerHandle lh;
-    long numPendingEntries;
-    long startEntryId;
-    long endEntryId;
-    long requestTimeNanos;
-    OpStatsLogger readOpLogger;
-
+    ReadLACAndEntryRequest request;
+    final BitSet heardFromHostsBitSet;
+    final BitSet emptyResponsesFromHostsBitSet;
     final int maxMissedReadsAllowed;
     boolean parallelRead = false;
-    final AtomicBoolean complete = new AtomicBoolean(false);
+    final AtomicBoolean requestComplete = new AtomicBoolean(false);
+
+    final long requestTimeNano;
+    private final LedgerHandle lh;
+    private final LastConfirmedAndEntryCallback cb;
 
-    abstract class LedgerEntryRequest extends LedgerEntry implements 
SpeculativeRequestExectuor {
+    private int numResponsesPending;
+    private final int numEmptyResponsesAllowed;
+    private volatile boolean hasValidResponse = false;
+    private final long prevEntryId;
+    private long lastAddConfirmed;
+    private long timeOutInMillis;
+
+    abstract class ReadLACAndEntryRequest extends LedgerEntry {
 
         final AtomicBoolean complete = new AtomicBoolean(false);
 
@@ -87,12 +71,23 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
 
         final ArrayList<BookieSocketAddress> ensemble;
         final List<Integer> writeSet;
+        final List<Integer> orderedEnsemble;
 
-        LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, 
long eId) {
+        ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long 
lId, long eId) {
             super(lId, eId);
 
             this.ensemble = ensemble;
             this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+            if (lh.bk.reorderReadSequence) {
+                this.orderedEnsemble = 
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
+                    writeSet, lh.bookieFailureHistory.asMap());
+            } else {
+                this.orderedEnsemble = writeSet;
+            }
+        }
+
+        synchronized int getFirstError() {
+            return firstError;
         }
 
         /**
@@ -112,11 +107,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
          * @return return true if we managed to complete the entry;
          *         otherwise return false if the read entry is not complete or 
it is already completed before
          */
-        boolean complete(int bookieIndex, BookieSocketAddress host, final 
ByteBuf buffer) {
+        boolean complete(int bookieIndex, BookieSocketAddress host, final 
ByteBuf buffer, long entryId) {
             ByteBuf content;
             try {
                 content = lh.macManager.verifyDigestAndReturnData(entryId, 
buffer);
-            } catch (BKDigestMatchException e) {
+            } catch (BKException.BKDigestMatchException e) {
                 logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", 
BKException.Code.DigestMatchException);
                 buffer.release();
                 return false;
@@ -124,6 +119,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
 
             if (!complete.getAndSet(true)) {
                 rc = BKException.Code.OK;
+                this.entryId = entryId;
                 /*
                  * The length is a long and it is the last field of the 
metadata of an entry.
                  * Consequently, we have to subtract 8 from METADATA_LENGTH to 
get the length.
@@ -132,7 +128,6 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
                 data = content;
                 return true;
             } else {
-                buffer.release();
                 return false;
             }
         }
@@ -147,13 +142,28 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
         boolean fail(int rc) {
             if (complete.compareAndSet(false, true)) {
                 this.rc = rc;
-                submitCallback(rc);
+                translateAndSetFirstError(rc);
+                completeRequest();
                 return true;
             } else {
                 return false;
             }
         }
 
+        synchronized private void translateAndSetFirstError(int rc) {
+            if (BKException.Code.OK == firstError ||
+                BKException.Code.NoSuchEntryException == firstError ||
+                BKException.Code.NoSuchLedgerExistsException == firstError) {
+                firstError = rc;
+            } else if (BKException.Code.BookieHandleNotAvailableException == 
firstError &&
+                BKException.Code.NoSuchEntryException != rc &&
+                BKException.Code.NoSuchLedgerExistsException != rc) {
+                // if other exception rather than NoSuchEntryException is 
returned
+                // we need to update firstError to indicate that it might be a 
valid read but just failed.
+                firstError = rc;
+            }
+        }
+
         /**
          * Log error <i>errMsg</i> and reattempt read from <i>host</i>.
          *
@@ -167,30 +177,22 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
          *          read result code
          */
         synchronized void logErrorAndReattemptRead(int bookieIndex, 
BookieSocketAddress host, String errMsg, int rc) {
-            if (BKException.Code.OK == firstError ||
-                BKException.Code.NoSuchEntryException == firstError ||
-                BKException.Code.NoSuchLedgerExistsException == firstError) {
-                firstError = rc;
-            } else if (BKException.Code.BookieHandleNotAvailableException == 
firstError &&
-                       BKException.Code.NoSuchEntryException != rc &&
-                       BKException.Code.NoSuchLedgerExistsException != rc) {
-                // if other exception rather than NoSuchEntryException or 
NoSuchLedgerExistsException is
-                // returned we need to update firstError to indicate that it 
might be a valid read but just
-                // failed.
-                firstError = rc;
-            }
+            translateAndSetFirstError(rc);
+
             if (BKException.Code.NoSuchEntryException == rc ||
                 BKException.Code.NoSuchLedgerExistsException == rc) {
-                ++numMissedEntryReads;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("No such entry found on bookie.  L{} E{} bookie: 
{}",
-                        new Object[] { lh.ledgerId, entryId, host });
-                }
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(errMsg + " while reading L{} E{} from bookie: 
{}",
-                        new Object[]{lh.ledgerId, entryId, host});
+                // Since we send all long poll requests to every available 
node, we should only
+                // treat these errors as failures if the node from which we 
received this is part of
+                // the writeSet
+                if (this.writeSet.contains(bookieIndex)) {
+                    lh.registerOperationFailureOnBookie(host, entryId);
                 }
+                ++numMissedEntryReads;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(errMsg + " while reading entry: " + entryId + " 
ledgerId: " + lh.ledgerId + " from bookie: "
+                    + host);
             }
         }
 
@@ -226,43 +228,20 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
         public String toString() {
             return String.format("L%d-E%d", ledgerId, entryId);
         }
-
-        /**
-         * Issues a speculative request and indicates if more speculative
-         * requests should be issued
-         *
-         * @return whether more speculative requests should be issued
-         */
-        @Override
-        public ListenableFuture<Boolean> issueSpeculativeRequest() {
-            return lh.bk.mainWorkerPool.submitOrdered(lh.getId(), new 
Callable<Boolean>() {
-                @Override
-                public Boolean call() throws Exception {
-                    if (!isComplete() && null != 
maybeSendSpeculativeRead(heardFromHostsBitSet)) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Send speculative read for {}. Hosts 
heard are {}, ensemble is {}.",
-                                new Object[] { this, heardFromHostsBitSet, 
ensemble });
-                        }
-                        return true;
-                    }
-                    return false;
-                }
-            });
-        }
     }
 
-    class ParallelReadRequest extends LedgerEntryRequest {
+    class ParallelReadRequest extends ReadLACAndEntryRequest {
 
         int numPendings;
 
         ParallelReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, 
long eId) {
             super(ensemble, lId, eId);
-            numPendings = writeSet.size();
+            numPendings = orderedEnsemble.size();
         }
 
         @Override
         void read() {
-            for (int bookieIndex : writeSet) {
+            for (int bookieIndex : orderedEnsemble) {
                 BookieSocketAddress to = ensemble.get(bookieIndex);
                 try {
                     sendReadTo(bookieIndex, to, this);
@@ -297,18 +276,20 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
         }
     }
 
-    class SequenceReadRequest extends LedgerEntryRequest {
+    class SequenceReadRequest extends ReadLACAndEntryRequest {
         final static int NOT_FOUND = -1;
         int nextReplicaIndexToReadFrom = 0;
 
         final BitSet sentReplicas;
         final BitSet erroredReplicas;
+        final BitSet emptyResponseReplicas;
 
         SequenceReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, 
long eId) {
             super(ensemble, lId, eId);
 
-            this.sentReplicas = new 
BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
-            this.erroredReplicas = new 
BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+            this.sentReplicas = new BitSet(orderedEnsemble.size());
+            this.erroredReplicas = new BitSet(orderedEnsemble.size());
+            this.emptyResponseReplicas = new BitSet(orderedEnsemble.size());
         }
 
         private synchronized int getNextReplicaIndexToReadFrom() {
@@ -316,7 +297,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
         }
 
         private int getReplicaIndex(int bookieIndex) {
-            return writeSet.indexOf(bookieIndex);
+            return orderedEnsemble.indexOf(bookieIndex);
         }
 
         private BitSet getSentToBitSet() {
@@ -324,14 +305,14 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
 
             for (int i = 0; i < sentReplicas.length(); i++) {
                 if (sentReplicas.get(i)) {
-                    b.set(writeSet.get(i));
+                    b.set(orderedEnsemble.get(i));
                 }
             }
             return b;
         }
 
         private boolean readsOutstanding() {
-            return (sentReplicas.cardinality() - 
erroredReplicas.cardinality()) > 0;
+            return (sentReplicas.cardinality() - erroredReplicas.cardinality() 
- emptyResponseReplicas.cardinality()) > 0;
         }
 
         /**
@@ -341,7 +322,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
          */
         @Override
         synchronized BookieSocketAddress maybeSendSpeculativeRead(BitSet 
heardFrom) {
-            if (nextReplicaIndexToReadFrom >= 
getLedgerMetadata().getWriteQuorumSize()) {
+            if (nextReplicaIndexToReadFrom >= 
getLedgerMetadata().getEnsembleSize()) {
                 return null;
             }
 
@@ -363,7 +344,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
         }
 
         synchronized BookieSocketAddress sendNextRead() {
-            if (nextReplicaIndexToReadFrom >= 
getLedgerMetadata().getWriteQuorumSize()) {
+            if (nextReplicaIndexToReadFrom >= 
getLedgerMetadata().getEnsembleSize()) {
                 // we are done, the read has failed from all replicas, just 
fail the
                 // read
 
@@ -379,7 +360,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
             }
 
             int replica = nextReplicaIndexToReadFrom;
-            int bookieIndex = 
lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom);
+            int bookieIndex = orderedEnsemble.get(nextReplicaIndexToReadFrom);
             nextReplicaIndexToReadFrom++;
 
             try {
@@ -404,7 +385,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
                 LOG.error("Received error from a host which is not in the 
ensemble {} {}.", host, ensemble);
                 return;
             }
-            erroredReplicas.set(replica);
+
+            if (BKException.Code.OK == rc) {
+                emptyResponseReplicas.set(replica);
+            } else {
+                erroredReplicas.set(replica);
+            }
 
             if (!readsOutstanding()) {
                 sendNextRead();
@@ -413,78 +399,101 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
 
     }
 
-    PendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
-                  long startEntryId, long endEntryId, ReadCallback cb, Object 
ctx) {
-        seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId + 
1) - startEntryId));
-        this.cb = cb;
-        this.ctx = ctx;
+    ReadLastConfirmedAndEntryOp(LedgerHandle lh,
+                                LastConfirmedAndEntryCallback cb,
+                                long prevEntryId,
+                                long timeOutInMillis,
+                                ScheduledExecutorService scheduler) {
         this.lh = lh;
-        this.startEntryId = startEntryId;
-        this.endEntryId = endEntryId;
+        this.cb = cb;
+        this.prevEntryId = prevEntryId;
+        this.lastAddConfirmed = lh.getLastAddConfirmed();
+        this.timeOutInMillis = timeOutInMillis;
+        this.numResponsesPending = 0;
+        this.numEmptyResponsesAllowed = 
getLedgerMetadata().getWriteQuorumSize()
+                - getLedgerMetadata().getAckQuorumSize() + 1;
+        this.requestTimeNano = MathUtils.nowInNano();
         this.scheduler = scheduler;
-        numPendingEntries = endEntryId - startEntryId + 1;
-        maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize()
-                - getLedgerMetadata().getAckQuorumSize();
-        heardFromHosts = new HashSet<>();
+        maxMissedReadsAllowed = getLedgerMetadata().getEnsembleSize()
+            - getLedgerMetadata().getAckQuorumSize();
         heardFromHostsBitSet = new 
BitSet(getLedgerMetadata().getEnsembleSize());
-
-        readOpLogger = lh.bk.getReadOpLogger();
+        emptyResponsesFromHostsBitSet = new 
BitSet(getLedgerMetadata().getEnsembleSize());
     }
 
     protected LedgerMetadata getLedgerMetadata() {
         return lh.metadata;
     }
 
-    protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
-        if (speculativeTask != null) {
-            speculativeTask.cancel(mayInterruptIfRunning);
-            speculativeTask = null;
-        }
-    }
-
-    PendingReadOp parallelRead(boolean enabled) {
+    ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
         this.parallelRead = enabled;
         return this;
     }
 
-    public void initiate() {
-        long nextEnsembleChange = startEntryId, i = startEntryId;
-        this.requestTimeNanos = MathUtils.nowInNano();
-        ArrayList<BookieSocketAddress> ensemble = null;
-
-        do {
-            if (i == nextEnsembleChange) {
-                ensemble = getLedgerMetadata().getEnsemble(i);
-                nextEnsembleChange = 
getLedgerMetadata().getNextEnsembleChange(i);
-            }
-            LedgerEntryRequest entry;
-            if (parallelRead) {
-                entry = new ParallelReadRequest(ensemble, lh.ledgerId, i);
-            } else {
-                entry = new SequenceReadRequest(ensemble, lh.ledgerId, i);
-            }
-            seq.add(entry);
-            i++;
-        } while (i <= endEntryId);
-        // read the entries.
-        for (LedgerEntryRequest entry : seq) {
-            entry.read();
-            if (!parallelRead && 
lh.bk.getReadSpeculativeRequestPolicy().isPresent()) {
-                
lh.bk.getReadSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler,
 entry);
+    /**
+     * Speculative Read Logic
+     */
+    @Override
+    public ListenableFuture<Boolean> issueSpeculativeRequest() {
+        return lh.bk.mainWorkerPool.submitOrdered(lh.getId(), new 
Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                if (!requestComplete.get() && !request.isComplete() &&
+                    (null != 
request.maybeSendSpeculativeRead(heardFromHostsBitSet))) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Send speculative ReadLAC {} for ledger {} 
(previousLAC: {}). Hosts heard are {}.",
+                            new Object[] {request, lh.getId(), 
lastAddConfirmed, heardFromHostsBitSet });
+                    }
+                    return true;
+                }
+                return false;
             }
+        });
+    }
+
+    public void initiate() {
+        if (parallelRead) {
+            request = new ParallelReadRequest(lh.metadata.currentEnsemble, 
lh.ledgerId, prevEntryId + 1);
+        } else {
+            request = new SequenceReadRequest(lh.metadata.currentEnsemble, 
lh.ledgerId, prevEntryId + 1);
         }
+        request.read();
+
+        if (!parallelRead && 
lh.bk.getReadLACSpeculativeRequestPolicy().isPresent()) {
+            
lh.bk.getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler,
 this);
+        }
+    }
+
+    void sendReadTo(int bookieIndex, BookieSocketAddress to, 
ReadLACAndEntryRequest entry) throws InterruptedException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Calling Read LAC and Entry with {} and long polling 
interval {} on Bookie {} - Parallel {}",
+                    new Object[] { prevEntryId, timeOutInMillis, to, 
parallelRead });
+        }
+        lh.bk.bookieClient.readEntryWaitForLACUpdate(to,
+            lh.ledgerId,
+            BookieProtocol.LAST_ADD_CONFIRMED,
+            prevEntryId,
+            timeOutInMillis,
+            true,
+            this, new ReadLastConfirmedAndEntryContext(bookieIndex, to));
+        this.numResponsesPending++;
     }
 
-    private static class ReadContext implements ReadEntryCallbackCtx {
+    /**
+     * Wrapper to get all recovered data from the request
+     */
+    interface LastConfirmedAndEntryCallback {
+        public void readLastConfirmedAndEntryComplete(int rc, long 
lastAddConfirmed, LedgerEntry entry);
+    }
+
+    public static class ReadLastConfirmedAndEntryContext implements 
BookkeeperInternalCallbacks.ReadEntryCallbackCtx {
         final int bookieIndex;
-        final BookieSocketAddress to;
-        final LedgerEntryRequest entry;
+        final BookieSocketAddress bookie;
         long lac = LedgerHandle.INVALID_ENTRY_ID;
+        Optional<Long> lacUpdateTimestamp = Optional.absent();
 
-        ReadContext(int bookieIndex, BookieSocketAddress to, 
LedgerEntryRequest entry) {
+        ReadLastConfirmedAndEntryContext(int bookieIndex, BookieSocketAddress 
bookie) {
             this.bookieIndex = bookieIndex;
-            this.to = to;
-            this.entry = entry;
+            this.bookie = bookie;
         }
 
         @Override
@@ -496,84 +505,116 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
         public long getLastAddConfirmed() {
             return lac;
         }
-    }
 
-    void sendReadTo(int bookieIndex, BookieSocketAddress to, 
LedgerEntryRequest entry) throws InterruptedException {
-        if (lh.throttler != null) {
-            lh.throttler.acquire();
+        public Optional<Long> getLacUpdateTimestamp() {
+            return lacUpdateTimestamp;
         }
 
-        lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId,
-                                     this, new ReadContext(bookieIndex, to, 
entry));
-    }
-
-    @Override
-    public void readEntryComplete(int rc, long ledgerId, final long entryId, 
final ByteBuf buffer, Object ctx) {
-        final ReadContext rctx = (ReadContext)ctx;
-        final LedgerEntryRequest entry = rctx.entry;
-
-        if (rc != BKException.Code.OK) {
-            entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: 
" + BKException.getMessage(rc), rc);
-            return;
+        public void setLacUpdateTimestamp(long lacUpdateTimestamp) {
+            this.lacUpdateTimestamp = Optional.of(lacUpdateTimestamp);
         }
 
-        heardFromHosts.add(rctx.to);
-        heardFromHostsBitSet.set(rctx.bookieIndex, true);
 
-        if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
-            lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
-            submitCallback(BKException.Code.OK);
-        }
+    }
 
-        if(numPendingEntries < 0)
-            LOG.error("Read too many values for ledger {} : [{}, {}].", new 
Object[] { ledgerId,
-                    startEntryId, endEntryId });
+    private void submitCallback(int rc, long lastAddConfirmed, LedgerEntry 
entry) {
+        long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
+        if (BKException.Code.OK != rc) {
+            lh.bk.getReadLacAndEntryOpLogger()
+                .registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
+        } else {
+            lh.bk.getReadLacAndEntryOpLogger()
+                .registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
+        }
+        cb.readLastConfirmedAndEntryComplete(rc, lastAddConfirmed, entry);
     }
 
-    protected void submitCallback(int code) {
-        if (BKException.Code.OK == code) {
-            numPendingEntries--;
-            if (numPendingEntries != 0) {
-                return;
+    @Override
+    public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf 
buffer, Object ctx) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("{} received response for (lid={}, eid={}) : {}",
+                new Object[] { getClass().getName(), ledgerId, entryId, rc });
+        }
+        ReadLastConfirmedAndEntryContext rCtx = 
(ReadLastConfirmedAndEntryContext) ctx;
+        BookieSocketAddress bookie = rCtx.bookie;
+        numResponsesPending--;
+        if (BKException.Code.OK == rc) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Received lastAddConfirmed (lac={}) from bookie({}) 
for (lid={}).",
+                    new Object[] { rCtx.getLastAddConfirmed(), bookie, 
ledgerId });
             }
-        }
 
-        // ensure callback once
-        if (!complete.compareAndSet(false, true)) {
-            return;
-        }
+            if (rCtx.getLastAddConfirmed() > lastAddConfirmed) {
+                lastAddConfirmed = rCtx.getLastAddConfirmed();
+                lh.updateLastConfirmed(rCtx.getLastAddConfirmed(), 0L);
+            }
+
+            hasValidResponse = true;
 
-        long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
-        if (code != BKException.Code.OK) {
-            long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
-            for (LedgerEntryRequest req : seq) {
-                if (!req.isComplete()) {
-                    firstUnread = req.getEntryId();
-                    break;
+            if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
+                if (request.complete(rCtx.bookieIndex, bookie, buffer, 
entryId)) {
+                    // callback immediately
+                    if (rCtx.getLacUpdateTimestamp().isPresent()) {
+                        long elapsedMicros = 
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis() - 
rCtx.getLacUpdateTimestamp().get());
+                        elapsedMicros = Math.max(elapsedMicros, 0);
+                        lh.bk.getReadLacAndEntryRespLogger()
+                            .registerSuccessfulEvent(elapsedMicros, 
TimeUnit.MICROSECONDS);
+                    }
+
+                    submitCallback(BKException.Code.OK, lastAddConfirmed, 
request);
+                    requestComplete.set(true);
+                    heardFromHostsBitSet.set(rCtx.bookieIndex, true);
                 }
+            } else {
+                emptyResponsesFromHostsBitSet.set(rCtx.bookieIndex, true);
+                if (lastAddConfirmed > prevEntryId) {
+                    // received advanced lac
+                    completeRequest();
+                } else if(emptyResponsesFromHostsBitSet.cardinality() >= 
numEmptyResponsesAllowed) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Completed readLACAndEntry(lid = {}, 
previousEntryId = {}) after received {} empty responses ('{}').",
+                                new Object[]{ledgerId, prevEntryId, 
emptyResponsesFromHostsBitSet.cardinality(), emptyResponsesFromHostsBitSet});
+                    }
+                    completeRequest();
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Received empty response for 
readLACAndEntry(lid = {}, previousEntryId = {}) from" +
+                                        " bookie {} @ {}, reattempting reading 
next bookie : lac = {}",
+                                new Object[]{ledgerId, prevEntryId, 
rCtx.bookieIndex,
+                                        rCtx.bookie, lastAddConfirmed});
+                    }
+                    request.logErrorAndReattemptRead(rCtx.bookieIndex, bookie, 
"Empty Response", rc);
+                }
+                return;
             }
-            LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {} 
: bitset = {}. First unread entry is {}",
-                    new Object[] { lh.getId(), startEntryId, endEntryId, 
heardFromHosts, heardFromHostsBitSet, firstUnread });
-            readOpLogger.registerFailedEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
+        } else if (BKException.Code.UnauthorizedAccessException == rc && 
!requestComplete.get()) {
+            submitCallback(rc, lastAddConfirmed, null);
+            requestComplete.set(true);
         } else {
-            readOpLogger.registerSuccessfulEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
+            request.logErrorAndReattemptRead(rCtx.bookieIndex, bookie, "Error: 
" + BKException.getMessage(rc), rc);
+            return;
+        }
+
+        if (numResponsesPending <= 0) {
+            completeRequest();
         }
-        cancelSpeculativeTask(true);
-        cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
-        cb = null;
     }
 
-    @Override
-    public boolean hasMoreElements() {
-        return !seq.isEmpty();
+    private void completeRequest() {
+        if (requestComplete.compareAndSet(false, true)) {
+            if (!hasValidResponse) {
+                // no success called
+                submitCallback(request.getFirstError(), lastAddConfirmed, 
null);
+            } else {
+                // callback
+                submitCallback(BKException.Code.OK, lastAddConfirmed, null);
+            }
+        }
     }
 
     @Override
-    public LedgerEntry nextElement() throws NoSuchElementException {
-        return seq.remove();
+    public String toString() {
+        return String.format("ReadLastConfirmedAndEntryOp(lid=%d, 
prevEntryId=%d])", lh.ledgerId, prevEntryId);
     }
 
-    public int size() {
-        return seq.size();
-    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 038437a..157c6b5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -20,7 +20,9 @@ package org.apache.bookkeeper.conf;
 import static com.google.common.base.Charsets.UTF_8;
 import static 
org.apache.bookkeeper.util.BookKeeperConstants.FEATURE_DISABLE_ENSEMBLE_CHANGE;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -66,8 +68,12 @@ public class ClientConfiguration extends 
AbstractConfiguration {
     protected final static String FIRST_SPECULATIVE_READ_TIMEOUT = 
"firstSpeculativeReadTimeout";
     protected final static String MAX_SPECULATIVE_READ_TIMEOUT = 
"maxSpeculativeReadTimeout";
     protected final static String SPECULATIVE_READ_TIMEOUT_BACKOFF_MULTIPLIER 
= "speculativeReadTimeoutBackoffMultiplier";
+    protected final static String FIRST_SPECULATIVE_READ_LAC_TIMEOUT = 
"firstSpeculativeReadLACTimeout";
+    protected final static String MAX_SPECULATIVE_READ_LAC_TIMEOUT = 
"maxSpeculativeReadLACTimeout";
+    protected final static String 
SPECULATIVE_READ_LAC_TIMEOUT_BACKOFF_MULTIPLIER = 
"speculativeReadLACTimeoutBackoffMultiplier";
     protected final static String ENABLE_PARALLEL_RECOVERY_READ = 
"enableParallelRecoveryRead";
     protected final static String RECOVERY_READ_BATCH_SIZE = 
"recoveryReadBatchSize";
+    protected final static String REORDER_READ_SEQUENCE_ENABLED = 
"reorderReadSequenceEnabled";
     // Add Parameters
     protected final static String DELAY_ENSEMBLE_CHANGE = 
"delayEnsembleChange";
     // Timeout Setting
@@ -103,7 +109,11 @@ public class ClientConfiguration extends 
AbstractConfiguration {
     // Stats
     protected final static String ENABLE_TASK_EXECUTION_STATS = 
"enableTaskExecutionStats";
     protected final static String TASK_EXECUTION_WARN_TIME_MICROS = 
"taskExecutionWarnTimeMicros";
-    
+
+    // Failure History Settings
+    protected final static String ENABLE_BOOKIE_FAILURE_TRACKING = 
"enableBookieFailureTracking";
+    protected final static String BOOKIE_FAILURE_HISTORY_EXPIRATION_MS = 
"bookieFailureHistoryExpirationMSec";
+
     // Names of dynamic features
     protected final static String DISABLE_ENSEMBLE_CHANGE_FEATURE_NAME = 
"disableEnsembleChangeFeatureName";
 
@@ -846,6 +856,27 @@ public class ClientConfiguration extends 
AbstractConfiguration {
     }
 
     /**
+     * Multipler to use when determining time between successive speculative 
read LAC requests
+     *
+     * @return speculative read LAC timeout backoff multiplier.
+     */
+    public float getSpeculativeReadLACTimeoutBackoffMultiplier() {
+        return getFloat(SPECULATIVE_READ_LAC_TIMEOUT_BACKOFF_MULTIPLIER, 2.0f);
+    }
+
+    /**
+     * Set the multipler to use when determining time between successive 
speculative read LAC requests
+     *
+     * @param speculativeReadLACTimeoutBackoffMultiplier
+     *          multipler to use when determining time between successive 
speculative read LAC requests.
+     * @return client configuration.
+     */
+    public ClientConfiguration 
setSpeculativeReadLACTimeoutBackoffMultiplier(float 
speculativeReadLACTimeoutBackoffMultiplier) {
+        setProperty(SPECULATIVE_READ_LAC_TIMEOUT_BACKOFF_MULTIPLIER, 
speculativeReadLACTimeoutBackoffMultiplier);
+        return this;
+    }
+
+    /**
      * Get the max speculative read timeout.
      *
      * @return max speculative read timeout.
@@ -867,6 +898,66 @@ public class ClientConfiguration extends 
AbstractConfiguration {
     }
 
     /**
+     * Get the period of time after which the first speculative read last add 
confirmed and entry
+     * should be triggered.
+     * A speculative entry request is sent to the next replica bookie before
+     * an error or response has been received for the previous entry read 
request.
+     *
+     * A speculative entry read is only sent if we have not heard from the 
current
+     * replica bookie during the entire read operation which may comprise of 
many entries.
+     *
+     * Speculative requests allow the client to avoid having to wait for the 
connect timeout
+     * in the case that a bookie has failed. It induces higher load on the 
network and on
+     * bookies. This should be taken into account before changing this 
configuration value.
+     *
+     * @return the speculative request timeout in milliseconds. Default 1500.
+     */
+    public int getFirstSpeculativeReadLACTimeout() {
+        return getInt(FIRST_SPECULATIVE_READ_LAC_TIMEOUT, 1500);
+    }
+
+
+    /**
+     * Get the maximum interval between successive speculative read last add 
confirmed and entry
+     * requests.
+     *
+     * @return the max speculative request timeout in milliseconds. Default 
5000.
+     */
+    public int getMaxSpeculativeReadLACTimeout() {
+        return getInt(MAX_SPECULATIVE_READ_LAC_TIMEOUT, 5000);
+    }
+
+    /**
+     * Set the period of time after which the first speculative read last add 
confirmed and entry
+     * should be triggered.
+     * A lower timeout will reduce read latency in the case of a failed bookie,
+     * while increasing the load on bookies and the network.
+     *
+     * The default is 1500 milliseconds. A value of 0 will disable speculative 
reads
+     * completely.
+     *
+     * @see #getSpeculativeReadTimeout()
+     * @param timeout the timeout value, in milliseconds
+     * @return client configuration
+     */
+    public ClientConfiguration setFirstSpeculativeReadLACTimeout(int timeout) {
+        setProperty(FIRST_SPECULATIVE_READ_LAC_TIMEOUT, timeout);
+        return this;
+    }
+
+    /**
+     * Set the maximum interval between successive speculative read last add 
confirmed and entry
+     * requests.
+     *
+     * @param timeout the timeout value, in milliseconds
+     * @return client configuration
+     */
+    public ClientConfiguration setMaxSpeculativeReadLACTimeout(int timeout) {
+        setProperty(MAX_SPECULATIVE_READ_LAC_TIMEOUT, timeout);
+        return this;
+    }
+
+    /**
      * Whether to enable parallel reading in recovery read.
      *
      * @return true if enable parallel reading in recovery read. otherwise, 
return false.
@@ -909,6 +1000,34 @@ public class ClientConfiguration extends 
AbstractConfiguration {
     }
 
     /**
+     * If reorder read sequence enabled or not.
+     *
+     * @return true if reorder read sequence is enabled, otherwise false.
+     */
+    public boolean isReorderReadSequenceEnabled() {
+        return getBoolean(REORDER_READ_SEQUENCE_ENABLED, false);
+    }
+
+    /**
+     * Enable/disable reordering read sequence on reading entries.
+     *
+     * <p>If this flag is enabled, the client will use
+     * {@link EnsemblePlacementPolicy#reorderReadSequence(ArrayList, List, 
Map)}
+     * to figure out a better read sequence to attempt reads from replicas and 
use
+     * {@link EnsemblePlacementPolicy#reorderReadLACSequence(ArrayList, List, 
Map)}
+     * to figure out a better read sequence to attempt long poll reads from 
replicas.
+     *
+     * <p>The order of read sequence is determined by the placement policy 
implementations.
+     *
+     * @param enabled the flag to enable/disable reorder read sequence.
+     * @return client configuration instance.
+     */
+    public ClientConfiguration setReorderReadSequenceEnabled(boolean enabled) {
+        setProperty(REORDER_READ_SEQUENCE_ENABLED, enabled);
+        return this;
+    }
+
+    /**
      * Get Ensemble Placement Policy Class.
      *
      * @return ensemble placement policy class.
@@ -1240,6 +1359,48 @@ public class ClientConfiguration extends 
AbstractConfiguration {
     }
 
     /**
+     * Whether to enable bookie failure tracking
+     *
+     * @return flag to enable/disable bookie failure tracking
+     */
+    public boolean getEnableBookieFailureTracking() {
+        return getBoolean(ENABLE_BOOKIE_FAILURE_TRACKING, true);
+    }
+
+    /**
+     * Enable/Disable bookie failure tracking.
+     *
+     * @param enabled
+     *          flag to enable/disable bookie failure tracking
+     * @return client configuration.
+     */
+    public ClientConfiguration setEnableBookieFailureTracking(boolean enabled) 
{
+        setProperty(ENABLE_BOOKIE_FAILURE_TRACKING, enabled);
+        return this;
+    }
+
+    /**
+     * Get the bookie failure tracking expiration timeout.
+     *
+     * @return bookie failure tracking expiration timeout.
+     */
+    public int getBookieFailureHistoryExpirationMSec() {
+        return getInt(BOOKIE_FAILURE_HISTORY_EXPIRATION_MS, 60000);
+    }
+
+    /**
+     * Set the bookie failure tracking expiration timeout.
+     *
+     * @param timeout
+     *          bookie failure tracking expiration timeout.
+     * @return client configuration.
+     */
+    public ClientConfiguration setBookieFailureHistoryExpirationMSec(int 
expirationMSec) {
+        setProperty(BOOKIE_FAILURE_HISTORY_EXPIRATION_MS, expirationMSec);
+        return this;
+    }
+
+    /**
      * Get the name of the dynamic feature that disables ensemble change
      *
      * @return name of the dynamic feature that disables ensemble change
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index b4dd066..e5c96e8 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -115,6 +115,11 @@ public class ServerConfiguration extends 
AbstractConfiguration {
     // Worker Thread parameters.
     protected final static String NUM_ADD_WORKER_THREADS = 
"numAddWorkerThreads";
     protected final static String NUM_READ_WORKER_THREADS = 
"numReadWorkerThreads";
+    protected final static String NUM_LONG_POLL_WORKER_THREADS = 
"numLongPollWorkerThreads";
+
+    // Long poll parameters
+    protected final static String REQUEST_TIMER_TICK_DURATION_MILLISEC = 
"requestTimerTickDurationMs";
+    protected final static String REQUEST_TIMER_NO_OF_TICKS = 
"requestTimerNumTicks";
 
     protected final static String READ_BUFFER_SIZE = "readBufferSizeBytes";
     protected final static String WRITE_BUFFER_SIZE = "writeBufferSizeBytes";
@@ -1131,6 +1136,26 @@ public class ServerConfiguration extends 
AbstractConfiguration {
     }
 
     /**
+     * Set the number of threads that should handle long poll requests
+     *
+     * @param numThreads
+     *          number of threads to handle long poll requests.
+     * @return server configuration
+     */
+    public ServerConfiguration setNumLongPollWorkerThreads(int numThreads) {
+        setProperty(NUM_LONG_POLL_WORKER_THREADS, numThreads);
+        return this;
+    }
+
+    /**
+     * Get the number of threads that should handle long poll requests.
+     * @return
+     */
+    public int getNumLongPollWorkerThreads() {
+        return getInt(NUM_LONG_POLL_WORKER_THREADS, 10);
+    }
+
+    /**
      * Set the number of threads that would handle read requests.
      *
      * @param numThreads
@@ -1148,6 +1173,46 @@ public class ServerConfiguration extends 
AbstractConfiguration {
     public int getNumReadWorkerThreads() {
         return getInt(NUM_READ_WORKER_THREADS, 8);
     }
+    
+    /**
+     * Set the tick duration in milliseconds
+     *
+     * @param tickDuration
+     *          tick duration in milliseconds.
+     * @return server configuration
+     */
+    public ServerConfiguration setRequestTimerTickDurationMs(int tickDuration) 
{
+        setProperty(REQUEST_TIMER_TICK_DURATION_MILLISEC, tickDuration);
+        return this;
+    }
+
+    /**
+     * Get the tick duration in milliseconds.
+     * @return
+     */
+    public int getRequestTimerTickDurationMs() {
+        return getInt(REQUEST_TIMER_TICK_DURATION_MILLISEC, 10);
+    }
+
+    /**
+     * Set the number of ticks per wheel for the request timer.
+     *
+     * @param tickCount
+     *          number of ticks per wheel for the request timer.
+     * @return server configuration
+     */
+    public ServerConfiguration setRequestTimerNumTicks(int tickCount) {
+        setProperty(REQUEST_TIMER_NO_OF_TICKS, tickCount);
+        return this;
+    }
+
+    /**
+     * Get the number of ticks per wheel for the request timer.
+     * @return
+     */
+    public int getRequestTimerNumTicks() {
+        return getInt(REQUEST_TIMER_NO_OF_TICKS, 1024);
+    }
 
     /**
      * Get the number of bytes used as capacity for the write buffer. Default 
is
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 1376048..4cbd814 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -364,17 +364,7 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
                 @Override
                 public void operationComplete(final int rc, 
PerChannelBookieClient pcbc) {
                     if (rc != BKException.Code.OK) {
-                        try {
-                            executor.submitOrdered(ledgerId, new 
SafeRunnable() {
-                                @Override
-                                public void safeRun() {
-                                    cb.readEntryComplete(rc, ledgerId, 
entryId, null, ctx);
-                                }
-                            });
-                        } catch (RejectedExecutionException re) {
-                            
cb.readEntryComplete(getRc(BKException.Code.InterruptedException),
-                                    ledgerId, entryId, null, ctx);
-                        }
+                        completeRead(rc, ledgerId, entryId, null, cb, ctx);
                         return;
                     }
                     pcbc.readEntry(ledgerId, entryId, cb, ctx);
@@ -384,6 +374,40 @@ public class BookieClient implements 
PerChannelBookieClientFactory {
             closeLock.readLock().unlock();
         }
     }
+    
+    
+    public void readEntryWaitForLACUpdate(final BookieSocketAddress addr,
+                                          final long ledgerId,
+                                          final long entryId,
+                                          final long previousLAC,
+                                          final long timeOutInMillis,
+                                          final boolean piggyBackEntry,
+                                          final ReadEntryCallback cb,
+                                          final Object ctx) {
+        closeLock.readLock().lock();
+        try {
+            final PerChannelBookieClientPool client = lookupClient(addr, 
entryId);
+            if (client == null) {
+                
completeRead(BKException.Code.BookieHandleNotAvailableException,
+                        ledgerId, entryId, null, cb, ctx);
+                return;
+            }
+
+            client.obtain(new GenericCallback<PerChannelBookieClient>() {
+                @Override
+                public void operationComplete(final int rc, 
PerChannelBookieClient pcbc) {
+
+                    if (rc != BKException.Code.OK) {
+                        completeRead(rc, ledgerId, entryId, null, cb, ctx);
+                        return;
+                    }
+                    pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, 
previousLAC, timeOutInMillis, piggyBackEntry, cb, ctx);
+                }
+            }, ledgerId);
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
 
     public void getBookieInfo(final BookieSocketAddress addr, final long 
requested, final GetBookieInfoCallback cb, final Object ctx) {
         closeLock.readLock().lock();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index ce5972e..1f20425 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -20,14 +20,18 @@
  */
 package org.apache.bookkeeper.proto;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
 import io.netty.channel.Channel;
+import io.netty.util.HashedWheelTimer;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
@@ -49,6 +53,7 @@ import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST;
+import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
@@ -79,6 +84,17 @@ public class BookieRequestProcessor implements 
RequestProcessor {
      */
     private final OrderedSafeExecutor writeThreadPool;
 
+    /**
+     * The threadpool used to execute all long poll requests issued to this 
server
+     * after they are done waiting
+     */
+    private final OrderedSafeExecutor longPollThreadPool;
+
+    /**
+     * The Timer used to time out requests for long polling
+     */
+    private final HashedWheelTimer requestTimer;
+
     // Expose Stats
     private final BKStats bkStats = BKStats.getInstance();
     private final boolean statsEnabled;
@@ -94,6 +110,7 @@ public class BookieRequestProcessor implements 
RequestProcessor {
     final OpStatsLogger longPollWaitStats;
     final OpStatsLogger longPollReadStats;
     final OpStatsLogger longPollReadRequestStats;
+    final Counter readLastEntryNoEntryErrorCounter;
     final OpStatsLogger writeLacRequestStats;
     final OpStatsLogger writeLacStats;
     final OpStatsLogger readLacRequestStats;
@@ -108,6 +125,15 @@ public class BookieRequestProcessor implements 
RequestProcessor {
         this.bookie = bookie;
         this.readThreadPool = 
createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThread-" + 
serverCfg.getBookiePort());
         this.writeThreadPool = 
createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThread-" + 
serverCfg.getBookiePort());
+        this.longPollThreadPool =
+            createExecutor(
+                this.serverCfg.getNumLongPollWorkerThreads(),
+                "BookieLongPollThread-" + serverCfg.getBookiePort());
+        this.requestTimer = new HashedWheelTimer(
+            new 
ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(),
+            this.serverCfg.getRequestTimerTickDurationMs(),
+            TimeUnit.MILLISECONDS, this.serverCfg.getRequestTimerNumTicks());
+
         // Expose Stats
         this.statsEnabled = serverCfg.isStatisticsEnabled();
         this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
@@ -122,6 +148,7 @@ public class BookieRequestProcessor implements 
RequestProcessor {
         this.longPollWaitStats = 
statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_WAIT);
         this.longPollReadStats = 
statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_READ);
         this.longPollReadRequestStats = 
statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_REQUEST);
+        this.readLastEntryNoEntryErrorCounter = 
statsLogger.getCounter(READ_LAST_ENTRY_NOENTRY_ERROR);
         this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC);
         this.writeLacRequestStats = 
statsLogger.getOpStatsLogger(WRITE_LAC_REQUEST);
         this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC);
@@ -231,11 +258,29 @@ public class BookieRequestProcessor implements 
RequestProcessor {
     private void processReadRequestV3(final BookkeeperProtocol.Request r, 
final Channel c) {
         ExecutorService fenceThreadPool =
           null == readThreadPool ? null : readThreadPool.chooseThread(c);
-        ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, this, 
fenceThreadPool);
-        if (null == readThreadPool) {
-            read.run();
+        ExecutorService lpThreadPool =
+          null == longPollThreadPool ? null : 
longPollThreadPool.chooseThread(c);
+        ReadEntryProcessorV3 read;
+        if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
+            read = new LongPollReadEntryProcessorV3(
+                r,
+                c,
+                this,
+                fenceThreadPool,
+                lpThreadPool,
+                requestTimer);
+            if (null == longPollThreadPool) {
+                read.run();
+            } else {
+                
longPollThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
+            }
         } else {
-            readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), 
read);
+            read = new ReadEntryProcessorV3(r, c, this, fenceThreadPool);
+            if (null == readThreadPool) {
+                read.run();
+            } else {
+                readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), 
read);
+            }
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
new file mode 100644
index 0000000..342e788
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import io.netty.channel.Channel;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.io.IOException;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Processor handling long poll read entry request.
+ */
+class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements 
Observer {
+
+    private final static Logger logger = 
LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class);
+
+    private final Long previousLAC;
+    private Optional<Long> lastAddConfirmedUpdateTime = Optional.absent();
+
+    // long poll execution state
+    private final ExecutorService longPollThreadPool;
+    private final HashedWheelTimer requestTimer;
+    private Timeout expirationTimerTask = null;
+    private Future<?> deferredTask = null;
+    private boolean shouldReadEntry = false;
+
+    LongPollReadEntryProcessorV3(Request request,
+                                 Channel channel,
+                                 BookieRequestProcessor requestProcessor,
+                                 ExecutorService fenceThreadPool,
+                                 ExecutorService longPollThreadPool,
+                                 HashedWheelTimer requestTimer) {
+        super(request, channel, requestProcessor, fenceThreadPool);
+        this.previousLAC = readRequest.getPreviousLAC();
+        this.longPollThreadPool = longPollThreadPool;
+        this.requestTimer = requestTimer;
+
+    }
+
+    @Override
+    protected Long getPreviousLAC() {
+        return previousLAC;
+    }
+
+    private synchronized boolean shouldReadEntry() {
+        return shouldReadEntry;
+    }
+
+    @Override
+    protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
+                                     long entryId,
+                                     Stopwatch startTimeSw)
+            throws IOException {
+        if (RequestUtils.shouldPiggybackEntry(readRequest)) {
+            if(!readRequest.hasPreviousLAC() || 
(BookieProtocol.LAST_ADD_CONFIRMED != entryId)) {
+                // This is not a valid request - client bug?
+                logger.error("Incorrect read request, entry piggyback 
requested incorrectly for ledgerId {} entryId {}",
+                        ledgerId, entryId);
+                return buildResponse(readResponseBuilder, StatusCode.EBADREQ, 
startTimeSw);
+            } else {
+                long knownLAC = 
requestProcessor.bookie.readLastAddConfirmed(ledgerId);
+                readResponseBuilder.setMaxLAC(knownLAC);
+                if (knownLAC > previousLAC) {
+                    entryId = previousLAC + 1;
+                    readResponseBuilder.setMaxLAC(knownLAC);
+                    if (lastAddConfirmedUpdateTime.isPresent()) {
+                        
readResponseBuilder.setLacUpdateTimestamp(lastAddConfirmedUpdateTime.get());
+                    }
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("ReadLAC Piggy Back reading entry:{} from 
ledger: {}", entryId, ledgerId);
+                    }
+                    try {
+                        return super.readEntry(readResponseBuilder, entryId, 
true, startTimeSw);
+                    } catch (Bookie.NoEntryException e) {
+                        
requestProcessor.readLastEntryNoEntryErrorCounter.inc();
+                        logger.info("No entry found while piggyback reading 
entry {} from ledger {} : previous lac = {}",
+                                new Object[] { entryId, ledgerId, previousLAC 
});
+                        // piggy back is best effort and this request can fail 
genuinely because of striping
+                        // entries across the ensemble
+                        return buildResponse(readResponseBuilder, 
StatusCode.EOK, startTimeSw);
+                    }
+                } else {
+                    if (knownLAC < previousLAC) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Found smaller lac when piggy back 
reading lac and entry from ledger {} :" +
+                                    " previous lac = {}, known lac = {}",
+                                    new Object[]{ ledgerId, previousLAC, 
knownLAC });
+                        }
+                    }
+                    return buildResponse(readResponseBuilder, StatusCode.EOK, 
startTimeSw);
+                }
+            }
+        } else {
+            return super.readEntry(readResponseBuilder, entryId, false, 
startTimeSw);
+        }
+    }
+
+    private ReadResponse buildErrorResponse(StatusCode statusCode, Stopwatch 
sw) {
+        ReadResponse.Builder builder = ReadResponse.newBuilder()
+                .setLedgerId(ledgerId)
+                .setEntryId(entryId);
+        return buildResponse(builder, statusCode, sw);
+    }
+
+    private ReadResponse getLongPollReadResponse() {
+        if (!shouldReadEntry() && readRequest.hasTimeOut()) {
+            if (logger.isTraceEnabled()) {
+                logger.trace("Waiting For LAC Update {}", previousLAC);
+            }
+
+            final Stopwatch startTimeSw = Stopwatch.createStarted();
+
+            final Observable observable;
+            try {
+                observable = 
requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, 
this);
+            } catch (Bookie.NoLedgerException e) {
+                logger.info("No ledger found while longpoll reading ledger {}, 
previous lac = {}.",
+                        ledgerId, previousLAC);
+                return buildErrorResponse(StatusCode.ENOLEDGER, startTimeSw);
+            } catch (IOException ioe) {
+                logger.error("IOException while longpoll reading ledger {}, 
previous lac = {} : ",
+                        new Object[] { ledgerId, previousLAC, ioe });
+                return buildErrorResponse(StatusCode.EIO, startTimeSw);
+            }
+
+            registerSuccessfulEvent(requestProcessor.longPollPreWaitStats, 
startTimeSw);
+            lastPhaseStartTime.reset().start();
+
+            if (null != observable) {
+                // successfully registered observable to lac updates
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Waiting For LAC Update {}: Timeout {}", 
previousLAC, readRequest.getTimeOut());
+                }
+                synchronized (this) {
+                    expirationTimerTask = requestTimer.newTimeout(new 
TimerTask() {
+                        @Override
+                        public void run(Timeout timeout) throws Exception {
+                            // When the timeout expires just get whatever is 
the current
+                            // readLastConfirmed
+                            
LongPollReadEntryProcessorV3.this.scheduleDeferredRead(observable, true);
+                        }
+                    }, readRequest.getTimeOut(), TimeUnit.MILLISECONDS);
+                }
+                return null;
+            }
+        }
+        // request doesn't have timeout or fail to wait, proceed to read entry
+        return getReadResponse();
+    }
+
+    @Override
+    protected void executeOp() {
+        ReadResponse readResponse = getLongPollReadResponse();
+        if (null != readResponse) {
+            sendResponse(readResponse);
+        }
+    }
+
+    @Override
+    public void update(Observable observable, Object o) {
+        LastAddConfirmedUpdateNotification newLACNotification = 
(LastAddConfirmedUpdateNotification)o;
+        if (newLACNotification.lastAddConfirmed > previousLAC) {
+            if (newLACNotification.lastAddConfirmed != Long.MAX_VALUE &&
+                    !lastAddConfirmedUpdateTime.isPresent()) {
+                lastAddConfirmedUpdateTime = 
Optional.of(newLACNotification.timestamp);
+            }
+            if (logger.isTraceEnabled()) {
+                logger.trace("Last Add Confirmed Advanced to {} for request 
{}",
+                        newLACNotification.lastAddConfirmed, request);
+            }
+            scheduleDeferredRead(observable, false);
+        }
+    }
+
+    private synchronized void scheduleDeferredRead(Observable observable, 
boolean timeout) {
+        if (null == deferredTask) {
+            if (logger.isTraceEnabled()) {
+                logger.trace("Deferred Task, expired: {}, request: {}", 
timeout, request);
+            }
+            observable.deleteObserver(this);
+            try {
+                shouldReadEntry = true;
+                deferredTask = longPollThreadPool.submit(this);
+            } catch (RejectedExecutionException exc) {
+                // If the threadPool has been shutdown, simply drop the task
+            }
+            if (null != expirationTimerTask) {
+                expirationTimerTask.cancel();
+            }
+
+            registerEvent(timeout, requestProcessor.longPollWaitStats, 
lastPhaseStartTime);
+            lastPhaseStartTime.reset().start();
+        }
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 2ec567b..7ecb0b4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -66,6 +66,7 @@ import com.google.protobuf.ByteString;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperClientStats;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.ReadLastConfirmedAndEntryOp;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -728,7 +729,36 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         }
     }
 
-    public void readEntry(final long ledgerId, final long entryId, 
ReadEntryCallback cb, Object ctx) {
+    /**
+     * Long Poll Reads
+     */
+    public void readEntryWaitForLACUpdate(final long ledgerId,
+                                          final long entryId,
+                                          final long previousLAC,
+                                          final long timeOutInMillis,
+                                          final boolean piggyBackEntry,
+                                          ReadEntryCallback cb,
+                                          Object ctx) {
+        readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis, 
piggyBackEntry, cb, ctx);
+    }
+
+    /**
+     * Normal Reads.
+     */
+    public void readEntry(final long ledgerId,
+                          final long entryId,
+                          ReadEntryCallback cb,
+                          Object ctx) {
+        readEntryInternal(ledgerId, entryId, null, null, false, cb, ctx);
+    }
+
+    private void readEntryInternal(final long ledgerId,
+                                   final long entryId,
+                                   final Long previousLAC,
+                                   final Long timeOutInMillis,
+                                   final boolean piggyBackEntry,
+                                   final ReadEntryCallback cb,
+                                   final Object ctx) {
         Object request = null;
         CompletionKey completion = null;
         if (useV2WireProtocol) {
@@ -749,6 +779,30 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                     .setLedgerId(ledgerId)
                     .setEntryId(entryId);
 
+            if (null != previousLAC) {
+                readBuilder = readBuilder.setPreviousLAC(previousLAC);
+            }
+
+            if (null != timeOutInMillis) {
+                // Long poll requires previousLAC
+                if (null == previousLAC) {
+                    
cb.readEntryComplete(BKException.Code.IncorrectParameterException,
+                        ledgerId, entryId, null, ctx);
+                    return;
+                }
+                readBuilder = readBuilder.setTimeOut(timeOutInMillis);
+            }
+
+            if (piggyBackEntry) {
+                // Long poll requires previousLAC
+                if (null == previousLAC) {
+                    
cb.readEntryComplete(BKException.Code.IncorrectParameterException,
+                        ledgerId, entryId, null, ctx);
+                    return;
+                }
+                readBuilder = 
readBuilder.setFlag(ReadRequest.Flag.ENTRY_PIGGYBACK);
+            }
+
             request = Request.newBuilder()
                     .setHeader(headerBuilder)
                     .setReadRequest(readBuilder)
@@ -1210,7 +1264,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                             if (readResponse.hasData()) {
                               data = readResponse.getData();
                             }
-                            handleReadResponse(ledgerId, entryId, status, 
data, INVALID_ENTRY_ID, completionValue);
+                            handleReadResponse(ledgerId, entryId, status, 
data, INVALID_ENTRY_ID, -1L, completionValue);
                             break;
                         }
                         default:
@@ -1302,7 +1356,11 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                             if (readResponse.hasMaxLAC()) {
                                 maxLAC = readResponse.getMaxLAC();
                             }
-                            handleReadResponse(readResponse.getLedgerId(), 
readResponse.getEntryId(), status, buffer, maxLAC, completionValue);
+                            long lacUpdateTimestamp = -1L;
+                            if (readResponse.hasLacUpdateTimestamp()) {
+                                lacUpdateTimestamp = 
readResponse.getLacUpdateTimestamp();
+                            }
+                            handleReadResponse(readResponse.getLedgerId(), 
readResponse.getEntryId(), status, buffer, maxLAC, lacUpdateTimestamp, 
completionValue);
                             break;
                         }
                         case WRITE_LAC: {
@@ -1416,6 +1474,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                             StatusCode status,
                             ByteBuf buffer,
                             long maxLAC, // max known lac piggy-back from 
bookies
+                            long lacUpdateTimestamp, // the timestamp when the 
lac is updated.
                             CompletionValue completionValue) {
         // The completion value should always be an instance of a 
ReadCompletion object when we reach here.
         ReadCompletion rc = (ReadCompletion)completionValue;
@@ -1440,6 +1499,9 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
         if (maxLAC > INVALID_ENTRY_ID && (rc.ctx instanceof 
ReadEntryCallbackCtx)) {
             ((ReadEntryCallbackCtx) rc.ctx).setLastAddConfirmed(maxLAC);
         }
+        if (lacUpdateTimestamp > -1L && (rc.ctx instanceof 
ReadLastConfirmedAndEntryOp.ReadLastConfirmedAndEntryContext)) {
+            ((ReadLastConfirmedAndEntryOp.ReadLastConfirmedAndEntryContext) 
rc.ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
+        }
         rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer, rc.ctx);
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 6be898d..0febdc7 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -24,6 +24,8 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Observable;
+import java.util.Observer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Callable;
@@ -339,6 +341,11 @@ public class TestSyncThread {
         }
 
         @Override
+        public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer) throws IOException {
+            return null;
+        }
+
+        @Override
         public Checkpoint checkpoint(Checkpoint checkpoint)
                 throws IOException {
             return checkpoint;
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
new file mode 100644
index 0000000..96c5c08
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
@@ -0,0 +1,267 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestReadLastConfirmedAndEntry extends BookKeeperClusterTestCase {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TestReadLastConfirmedAndEntry.class);
+
+    final BookKeeper.DigestType digestType;
+
+    public TestReadLastConfirmedAndEntry() {
+        super(3);
+        this.digestType = BookKeeper.DigestType.CRC32;
+    }
+
+    static class FakeBookie extends Bookie {
+
+        final long expectedEntryToFail;
+        final boolean stallOrRespondNull;
+
+        public FakeBookie(ServerConfiguration conf, long expectedEntryToFail, 
boolean stallOrRespondNull)
+                throws InterruptedException, BookieException, KeeperException, 
IOException {
+            super(conf);
+            this.expectedEntryToFail = expectedEntryToFail;
+            this.stallOrRespondNull = stallOrRespondNull;
+        }
+
+        @Override
+        public ByteBuf readEntry(long ledgerId, long entryId)
+                throws IOException, NoLedgerException {
+            if (entryId == expectedEntryToFail) {
+                if (stallOrRespondNull) {
+                    try {
+                        Thread.sleep(600000);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                } else {
+                    throw new NoEntryException(ledgerId, entryId);
+                }
+            }
+            return super.readEntry(ledgerId, entryId);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testAdvancedLacWithEmptyResponse() throws Exception {
+        byte[] passwd = "advanced-lac-with-empty-response".getBytes(UTF_8);
+
+        ClientConfiguration newConf = new ClientConfiguration();
+        newConf.addConfiguration(baseClientConf);
+        newConf.setAddEntryTimeout(9999999);
+        newConf.setReadEntryTimeout(9999999);
+
+        // stop existing bookies
+        stopAllBookies();
+        // add fake bookies
+        long expectedEntryIdToFail = 2;
+        for (int i = 0; i < numBookies; i++) {
+            ServerConfiguration conf = newServerConfiguration();
+            Bookie b = new FakeBookie(conf, expectedEntryIdToFail, i != 0);
+            bs.add(startBookie(conf, b));
+            bsConfs.add(conf);
+        }
+
+        // create bookkeeper
+        BookKeeper newBk = new BookKeeper(newConf);
+        // create ledger to write some data
+        LedgerHandle lh = newBk.createLedger(3, 3, 2, digestType, passwd);
+        for (int i = 0; i <= expectedEntryIdToFail; i++) {
+            lh.addEntry("test".getBytes(UTF_8));
+        }
+
+        // open ledger to tail reading
+        LedgerHandle newLh = newBk.openLedgerNoRecovery(lh.getId(), 
digestType, passwd);
+        long lac = newLh.readLastConfirmed();
+        assertEquals(expectedEntryIdToFail - 1, lac);
+        Enumeration<LedgerEntry> entries = newLh.readEntries(0, lac);
+
+        int numReads = 0;
+        long expectedEntryId = 0L;
+        while (entries.hasMoreElements()) {
+            LedgerEntry entry = entries.nextElement();
+            assertEquals(expectedEntryId++, entry.getEntryId());
+            ++numReads;
+        }
+        assertEquals(lac + 1, numReads);
+
+        final AtomicInteger rcHolder = new AtomicInteger(-12345);
+        final AtomicLong lacHolder = new AtomicLong(lac);
+        final AtomicReference<LedgerEntry> entryHolder = new 
AtomicReference<LedgerEntry>(null);
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        newLh.asyncReadLastConfirmedAndEntry(newLh.getLastAddConfirmed() + 1, 
99999, false,
+                new AsyncCallback.ReadLastConfirmedAndEntryCallback() {
+            @Override
+            public void readLastConfirmedAndEntryComplete(int rc, long 
lastConfirmed, LedgerEntry entry, Object ctx) {
+                rcHolder.set(rc);
+                lacHolder.set(lastConfirmed);
+                entryHolder.set(entry);
+                latch.countDown();
+            }
+        }, null);
+
+        lh.addEntry("another test".getBytes(UTF_8));
+
+        latch.await();
+        assertEquals(expectedEntryIdToFail, lacHolder.get());
+        assertNull(entryHolder.get());
+        assertEquals(BKException.Code.OK, rcHolder.get());
+    }
+
+    static class SlowReadLacBookie extends Bookie {
+
+        private final long lacToSlowRead;
+        private final CountDownLatch readLatch;
+
+        public SlowReadLacBookie(ServerConfiguration conf,
+                                 long lacToSlowRead, CountDownLatch readLatch)
+                throws IOException, KeeperException, InterruptedException, 
BookieException {
+            super(conf);
+            this.lacToSlowRead = lacToSlowRead;
+            this.readLatch = readLatch;
+        }
+
+        @Override
+        public long readLastAddConfirmed(long ledgerId) throws IOException {
+            long lac = super.readLastAddConfirmed(ledgerId);
+            logger.info("Last Add Confirmed for ledger {} is {}", ledgerId, 
lac);
+            if (lacToSlowRead == lac) {
+                logger.info("Suspend returning lac {} for ledger {}", lac, 
ledgerId);
+                try {
+                    readLatch.await();
+                } catch (InterruptedException e) {
+                    // no-op
+                }
+            }
+            return super.readLastAddConfirmed(ledgerId);
+        }
+    }
+
+    static class ReadLastConfirmedAndEntryResult implements 
AsyncCallback.ReadLastConfirmedAndEntryCallback {
+
+        int rc = -1234;
+        long lac = -1234L;
+        LedgerEntry entry = null;
+        final CountDownLatch doneLatch = new CountDownLatch(1);
+
+        @Override
+        public void readLastConfirmedAndEntryComplete(int rc, long 
lastConfirmed, LedgerEntry entry, Object ctx) {
+            this.rc = rc;
+            this.lac = lastConfirmed;
+            this.entry = entry;
+            doneLatch.countDown();
+        }
+
+        void await() throws InterruptedException {
+            doneLatch.await();
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testRaceOnLastAddConfirmed() throws Exception {
+        byte[] passwd = "race-on-last-add-confirmed".getBytes(UTF_8);
+
+        ClientConfiguration newConf = new ClientConfiguration();
+        newConf.addConfiguration(baseClientConf);
+        newConf.setAddEntryTimeout(9999999);
+        newConf.setReadEntryTimeout(9999999);
+
+        final long lacToSlowRead = 0L;
+        final CountDownLatch readLatch = new CountDownLatch(1);
+
+        // stop first bookie
+        ServerConfiguration bsConf = killBookie(0);
+        // start it with a slow bookie
+        Bookie b = new SlowReadLacBookie(bsConf, lacToSlowRead, readLatch);
+        bs.add(startBookie(bsConf, b));
+        bsConfs.add(bsConf);
+
+        // create bookkeeper
+        BookKeeper newBk = new BookKeeper(newConf);
+        // create ledger
+        LedgerHandle lh = newBk.createLedger(3, 3, 3, digestType, passwd);
+        // 0) write entry 0
+        lh.addEntry("entry-0".getBytes(UTF_8));
+
+        // open ledger to read
+        LedgerHandle readLh = newBk.openLedgerNoRecovery(lh.getId(), 
digestType, passwd);
+
+        // 1) wait entry 0 to be committed
+        ReadLastConfirmedAndEntryResult readResult = new 
ReadLastConfirmedAndEntryResult();
+        readLh.asyncReadLastConfirmedAndEntry(0L, 9999999, true, readResult, 
null);
+
+        // 2) write entry 1 to commit entry 0 => lac = 0
+        lh.addEntry("entry-1".getBytes(UTF_8));
+        readResult.await();
+        assertEquals(BKException.Code.OK, readResult.rc);
+        assertEquals(0L, readResult.lac);
+        assertEquals(0L, readResult.entry.getEntryId());
+        assertEquals("entry-0", new String(readResult.entry.getEntry(), 
UTF_8));
+
+        // 3) write entry 2 to commit entry 1 => lac = 1
+        lh.addEntry("entry-2".getBytes(UTF_8));
+        // 4) count down read latch to trigger previous readLacAndEntry request
+        readLatch.countDown();
+        // 5) due to piggyback, the lac is updated to lac = 1
+        while (readLh.getLastAddConfirmed() < 1L) {
+            Thread.sleep(100);
+        }
+        // 6) write entry 3 to commit entry 2 => lac = 2
+        lh.addEntry("entry-3".getBytes(UTF_8));
+        // 7) readLastConfirmedAndEntry for next entry (we are expecting to 
read entry 1)
+        readResult = new ReadLastConfirmedAndEntryResult();
+        readLh.asyncReadLastConfirmedAndEntry(1L, 9999999, true, readResult, 
null);
+        readResult.await();
+        assertEquals(BKException.Code.OK, readResult.rc);
+        assertEquals(2L, readResult.lac);
+        assertEquals(1L, readResult.entry.getEntryId());
+        assertEquals("entry-1", new String(readResult.entry.getEntry(), 
UTF_8));
+
+        lh.close();
+        readLh.close();
+
+        newBk.close();
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
new file mode 100644
index 0000000..094fe88
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestReadLastConfirmedLongPoll extends BookKeeperClusterTestCase {
+    final DigestType digestType;
+
+    public TestReadLastConfirmedLongPoll() {
+        super(6);
+        this.digestType = DigestType.CRC32;
+    }
+
+    @Test(timeout = 60000)
+    public void testReadLACLongPollWhenAllBookiesUp() throws Exception {
+        final int numEntries = 3;
+
+        final LedgerHandle lh = bkc.createLedger(3, 3, 1, digestType, 
"".getBytes());
+        LedgerHandle readLh = bkc.openLedgerNoRecovery(lh.getId(), digestType, 
"".getBytes());
+        assertEquals(LedgerHandle.INVALID_ENTRY_ID, 
readLh.getLastAddConfirmed());
+        // add entries
+        for (int i = 0; i < (numEntries - 1); i++) {
+            lh.addEntry(("data" + i).getBytes());
+        }
+        final AtomicBoolean success = new AtomicBoolean(false);
+        final AtomicInteger numCallbacks = new AtomicInteger(0);
+        final CountDownLatch firstReadComplete = new CountDownLatch(1);
+        readLh.asyncTryReadLastConfirmed(new 
AsyncCallback.ReadLastConfirmedCallback() {
+            @Override
+            public void readLastConfirmedComplete(int rc, long lastConfirmed, 
Object ctx) {
+                numCallbacks.incrementAndGet();
+                if (BKException.Code.OK == rc) {
+                    success.set(true);
+                } else {
+                    success.set(false);
+                }
+                firstReadComplete.countDown();
+            }
+        }, null);
+        firstReadComplete.await();
+        assertTrue(success.get());
+        assertTrue(numCallbacks.get() == 1);
+        assertEquals(numEntries - 3, readLh.getLastAddConfirmed());
+        // try read last confirmed again
+        success.set(false);
+        numCallbacks.set(0);
+        long entryId = readLh.getLastAddConfirmed()+1;
+        final CountDownLatch secondReadComplete = new CountDownLatch(1);
+        readLh.asyncReadLastConfirmedAndEntry(entryId++, 1000, true, new 
AsyncCallback.ReadLastConfirmedAndEntryCallback() {
+            @Override
+            public void readLastConfirmedAndEntryComplete(int rc, long 
lastConfirmed, LedgerEntry entry, Object ctx) {
+                numCallbacks.incrementAndGet();
+                if (BKException.Code.OK == rc && lastConfirmed == (numEntries 
- 2)) {
+                    success.set(true);
+                } else {
+                    success.set(false);
+                }
+                secondReadComplete.countDown();
+            }
+        }, null);
+        lh.addEntry(("data" + (numEntries - 1)).getBytes());
+        secondReadComplete.await();
+        assertTrue(success.get());
+        assertTrue(numCallbacks.get() == 1);
+        assertEquals(numEntries - 2, readLh.getLastAddConfirmed());
+
+        success.set(false);
+        numCallbacks.set(0);
+        final CountDownLatch thirdReadComplete = new CountDownLatch(1);
+        readLh.asyncReadLastConfirmedAndEntry(entryId++, 1000, false, new 
AsyncCallback.ReadLastConfirmedAndEntryCallback() {
+            @Override
+            public void readLastConfirmedAndEntryComplete(int rc, long 
lastConfirmed, LedgerEntry entry, Object ctx) {
+                numCallbacks.incrementAndGet();
+                if (BKException.Code.OK == rc && lastConfirmed == (numEntries 
- 1)) {
+                    success.set(true);
+                } else {
+                    success.set(false);
+                }
+                thirdReadComplete.countDown();
+            }
+        }, null);
+        lh.addEntry(("data" + numEntries).getBytes());
+        thirdReadComplete.await();
+        assertTrue(success.get());
+        assertTrue(numCallbacks.get() == 1);
+        assertEquals(numEntries - 1, readLh.getLastAddConfirmed());
+        lh.close();
+        readLh.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReadLACLongPollWhenSomeBookiesDown() throws Exception {
+        final int numEntries = 3;
+        final LedgerHandle lh = bkc.createLedger(3, 1, 1, digestType, 
"".getBytes());
+        LedgerHandle readLh = bkc.openLedgerNoRecovery(lh.getId(), digestType, 
"".getBytes());
+        assertEquals(LedgerHandle.INVALID_ENTRY_ID, 
readLh.getLastAddConfirmed());
+        // add entries
+        for (int i = 0; i < numEntries; i++) {
+            lh.addEntry(("data" + i).getBytes());
+        }
+        for (int i = 0; i < numEntries; i++) {
+            ServerConfiguration[] confs = new ServerConfiguration[numEntries - 
1];
+            for (int j = 0; j < numEntries - 1; j++) {
+                int idx = (i + 1 + j) % numEntries;
+                confs[j] = 
killBookie(lh.getLedgerMetadata().currentEnsemble.get(idx));
+            }
+
+            final AtomicBoolean entryAsExpected = new AtomicBoolean(false);
+            final AtomicBoolean success = new AtomicBoolean(false);
+            final AtomicInteger numCallbacks = new AtomicInteger(0);
+            final CountDownLatch readComplete = new CountDownLatch(1);
+            final int entryId = i;
+            readLh.asyncTryReadLastConfirmed(new 
AsyncCallback.ReadLastConfirmedCallback() {
+                @Override
+                public void readLastConfirmedComplete(int rc, long 
lastConfirmed, Object ctx) {
+                    numCallbacks.incrementAndGet();
+                    if (BKException.Code.OK == rc) {
+                        success.set(true);
+                        entryAsExpected.set(lastConfirmed == (entryId - 1));
+                    } else {
+                        System.out.println("Return value" + rc);
+                        success.set(false);
+                        entryAsExpected.set(false);
+                    }
+                    readComplete.countDown();
+                }
+            }, null);
+            readComplete.await();
+            assertTrue(success.get());
+            assertTrue(entryAsExpected.get());
+            assertTrue(numCallbacks.get() == 1);
+
+            lh.close();
+            readLh.close();
+
+            // start the bookies
+            for (ServerConfiguration conf : confs) {
+                bs.add(startBookie(conf));
+                bsConfs.add(conf);
+            }
+        }
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 48b8bab..95a0506 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -38,6 +38,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Observable;
+import java.util.Observer;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
@@ -418,5 +420,10 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         @Override
         public void flushEntriesLocationsIndex() throws IOException {
         }
+
+        @Override
+        public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer) throws IOException {
+            return null;
+        }
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 747398a..c6f2a36 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -21,12 +21,14 @@
 
 package org.apache.bookkeeper.meta;
 
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import java.util.NavigableMap;
-
+import java.util.Observable;
+import java.util.Observer;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -43,17 +45,12 @@ import org.junit.Before;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.buffer.ByteBuf;
 
 /**
  * Test case to run over serveral ledger managers
  */
 @RunWith(Parameterized.class)
 public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
-    static final Logger LOG = 
LoggerFactory.getLogger(LedgerManagerTestCase.class);
 
     protected LedgerManagerFactory ledgerManagerFactory;
     protected LedgerManager ledgerManager = null;
@@ -208,14 +205,16 @@ public abstract class LedgerManagerTestCase extends 
BookKeeperClusterTestCase {
         }
 
         @Override
-        public void setExplicitlac(long ledgerId, ByteBuf lac) throws 
IOException {
-            // TODO Auto-generated method stub
+        public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer) throws IOException {
+            return null;
+        }
 
+        @Override
+        public void setExplicitlac(long ledgerId, ByteBuf lac) throws 
IOException {
         }
 
         @Override
         public ByteBuf getExplicitLac(long ledgerId) {
-            // TODO Auto-generated method stub
             return null;
         }
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 9106841..a2f66bb 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -214,6 +214,19 @@ public abstract class BookKeeperClusterTestCase {
         return conf;
     }
 
+    protected void stopAllBookies() throws Exception {
+        for (BookieServer server : bs) {
+            server.shutdown();
+        }
+        bs.clear();
+    }
+
+    protected void startAllBookies() throws Exception {
+        for (ServerConfiguration conf : bsConfs) {
+            bs.add(startBookie(conf));
+        }
+    }
+
     /**
      * Get bookie address for bookie at index
      */

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Reply via email to