This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.6 by this push:
     new 9f73b61  ISSUE #731: refine LedgerEntry interface and implementation
9f73b61 is described below

commit 9f73b617af15b153130832a77c4b632ff095a80c
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Nov 22 09:11:08 2017 +0800

    ISSUE #731: refine LedgerEntry interface and implementation
    
    In LedgerEntry interface, getEntry() and getEntryBuffer() have completely 
different behaviours. getEntry() releases bytebuf automatically, while 
getEntryBuffer() returns the bytebuf (if you don't call getEntry, you are 
responsible for releasing the entry buffer.
    
    In this change:
    make getEntry() reenter-able; make LedgerEntry implement AutoCloseable; 
LedgerEntry is responsible for releasing the bytebuf it is holding.
    
    Descriptions of the changes in this PR:
    1.  add `close` and `duplicate` in `api.LedgerEntry`,
    1.  LedgerEntryImpl implements `api.LedgerEntry`,
    1.  client.LedgerEntry doesn't implement the new api. it wraps an 
`api.LedgerEntry`,
    1.  add `close` in api.LastConfirmedAndEntry."
    1.  fix build and test errors.
    
    Author: Jia Zhai <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #738 from zhaijack/issue-731, closes #731
    
    (cherry picked from commit 87d1afcc329d7261f19a6310ca9a7cf2acce4dee)
    Signed-off-by: Jia Zhai <[email protected]>
---
 .../org/apache/bookkeeper/client/LedgerEntry.java  |  22 ++-
 .../org/apache/bookkeeper/client/LedgerHandle.java |  82 +++++++++--
 .../apache/bookkeeper/client/LedgerRecoveryOp.java |   2 -
 .../client/ListenerBasedPendingReadOp.java         |  28 ++--
 .../apache/bookkeeper/client/PendingReadOp.java    | 105 +++++++-------
 .../client/ReadLastConfirmedAndEntryOp.java        |  45 +++---
 .../bookkeeper/client/SyncCallbackUtils.java       |  32 +----
 .../client/api/LastConfirmedAndEntry.java          |   9 +-
 .../apache/bookkeeper/client/api/LedgerEntry.java  |  27 ++--
 .../client/impl/LastConfirmedAndEntryImpl.java     |  45 +++++-
 .../bookkeeper/client/impl/LedgerEntryImpl.java    | 155 +++++++++++++++++++++
 .../bookkeeper/client/MockBookKeeperTestCase.java  |   9 +-
 .../apache/bookkeeper/client/TestParallelRead.java | 138 ++++++++----------
 .../bookkeeper/client/TestReadEntryListener.java   |  43 +++---
 .../bookkeeper/client/TestSpeculativeRead.java     |  10 +-
 .../bookkeeper/client/api/BookKeeperApiTest.java   |   3 +-
 16 files changed, 482 insertions(+), 273 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
index 4e39e51..8cb31f3 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
@@ -23,8 +23,8 @@ package org.apache.bookkeeper.client;
 import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-
 import java.io.InputStream;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 
 /**
@@ -32,30 +32,28 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
  * the entry content.
  *
  */
-public class LedgerEntry
-    implements org.apache.bookkeeper.client.api.LedgerEntry {
+public class LedgerEntry {
 
     final long ledgerId;
-    long entryId;
-    long length;
+    final long entryId;
+    final long length;
     ByteBuf data;
 
-    LedgerEntry(long lId, long eId) {
-        this.ledgerId = lId;
-        this.entryId = eId;
+    LedgerEntry(LedgerEntryImpl entry) {
+        this.ledgerId = entry.getLedgerId();
+        this.entryId = entry.getEntryId();
+        this.length = entry.getLength();
+        this.data = entry.getEntryBuffer().retain();
     }
 
-    @Override
     public long getLedgerId() {
         return ledgerId;
     }
 
-    @Override
     public long getEntryId() {
         return entryId;
     }
 
-    @Override
     public long getLength() {
         return length;
     }
@@ -68,7 +66,6 @@ public class LedgerEntry
      * @return the content of the entry
      * @throws IllegalStateException if this method is called twice
      */
-    @Override
     public byte[] getEntry() {
         Preconditions.checkState(null != data, "entry content can be accessed 
only once");
         byte[] entry = new byte[data.readableBytes()];
@@ -105,7 +102,6 @@ public class LedgerEntry
      * @throws IllegalStateException if the entry has been retrieved by {@link 
#getEntry()}
      * or {@link #getEntryInputStream()}.
      */
-    @Override
     public ByteBuf getEntryBuffer() {
         Preconditions.checkState(null != data, "entry content has been 
retrieved" +
             " by #getEntry or #getEntryInputStream");
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 60ad65f..49962a9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -20,11 +20,14 @@
  */
 package org.apache.bookkeeper.client;
 
+import static 
org.apache.bookkeeper.client.api.BKException.Code.ClientClosedException;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
 import io.netty.buffer.ByteBuf;
@@ -49,16 +52,21 @@ import 
org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
+import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
+import org.apache.bookkeeper.client.BKException.BKReadException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmed;
 import 
org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmedAndEntry;
-import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadResult;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCloseCallback;
 import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
 import 
org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallback;
+import org.apache.bookkeeper.client.api.BKException.Code;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -69,6 +77,7 @@ import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import 
org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.commons.collections4.IteratorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -626,9 +635,20 @@ public class LedgerHandle implements WriteHandle {
      */
     @Override
     public 
CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>> 
read(long firstEntry, long lastEntry) {
-        FutureReadResult result = new FutureReadResult();
-        asyncReadEntries(firstEntry, lastEntry, result, null);
-        return result;
+        // Little sanity check
+        if (firstEntry < 0 || firstEntry > lastEntry) {
+            LOG.error("IncorrectParameterException on ledgerId:{} 
firstEntry:{} lastEntry:{}",
+                    new Object[] { ledgerId, firstEntry, lastEntry });
+            return FutureUtils.exception(new BKIncorrectParameterException());
+        }
+
+        if (lastEntry > lastAddConfirmed) {
+            LOG.error("ReadException on ledgerId:{} firstEntry:{} 
lastEntry:{}",
+                    new Object[] { ledgerId, firstEntry, lastEntry });
+            return FutureUtils.exception(new BKReadException());
+        }
+
+        return readEntriesInternalAsync(firstEntry, lastEntry);
     }
 
     /**
@@ -656,14 +676,58 @@ public class LedgerHandle implements WriteHandle {
      */
     @Override
     public 
CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>> 
readUnconfirmed(long firstEntry, long lastEntry) {
-        FutureReadResult result = new FutureReadResult();
-        asyncReadUnconfirmedEntries(firstEntry, lastEntry, result, null);
-        return result;
+        // Little sanity check
+        if (firstEntry < 0 || firstEntry > lastEntry) {
+            LOG.error("IncorrectParameterException on ledgerId:{} 
firstEntry:{} lastEntry:{}",
+                    new Object[] { ledgerId, firstEntry, lastEntry });
+            return FutureUtils.exception(new BKIncorrectParameterException());
+        }
+
+        return readEntriesInternalAsync(firstEntry, lastEntry);
     }
 
     void asyncReadEntriesInternal(long firstEntry, long lastEntry, 
ReadCallback cb, Object ctx) {
-        new PendingReadOp(this, bk.getScheduler(),
-                          firstEntry, lastEntry, cb, ctx).initiate();
+        if(!bk.isClosed()) {
+            readEntriesInternalAsync(firstEntry, lastEntry)
+                .whenCompleteAsync(new 
FutureEventListener<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>() {
+                    @Override
+                    public void 
onSuccess(Iterable<org.apache.bookkeeper.client.api.LedgerEntry> iterable) {
+                        cb.readComplete(
+                            Code.OK,
+                            LedgerHandle.this,
+                            IteratorUtils.asEnumeration(
+                                Iterators.transform(iterable.iterator(), le -> 
{
+                                    LedgerEntry entry = new 
LedgerEntry((LedgerEntryImpl) le);
+                                    le.close();
+                                    return entry;
+                                })),
+                            ctx);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        if (cause instanceof BKException) {
+                            BKException bke = (BKException) cause;
+                            cb.readComplete(bke.getCode(), LedgerHandle.this, 
null, ctx);
+                        } else {
+                            cb.readComplete(Code.UnexpectedConditionException, 
LedgerHandle.this, null, ctx);
+                        }
+                    }
+                }, bk.getMainWorkerPool().chooseThread(ledgerId));
+        } else {
+            cb.readComplete(Code.ClientClosedException, LedgerHandle.this, 
null, ctx);
+        }
+    }
+
+    CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>> 
readEntriesInternalAsync(long firstEntry,
+                                                                               
                        long lastEntry) {
+        PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), 
firstEntry, lastEntry);
+        if(!bk.isClosed()) {
+            bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+        } else {
+            
op.future().completeExceptionally(BKException.create(ClientClosedException));
+        }
+        return op.future();
     }
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index df7c84e..cde52a1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -21,10 +21,8 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.DigestManager.RecoveryData;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
index afb21bf..290e69b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
@@ -20,15 +20,17 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.util.NoSuchElementException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
 import org.apache.bookkeeper.util.MathUtils;
 
+@Slf4j
 class ListenerBasedPendingReadOp extends PendingReadOp {
 
     final ReadEntryListener listener;
+    final Object ctx;
 
     ListenerBasedPendingReadOp(LedgerHandle lh,
                                ScheduledExecutorService scheduler,
@@ -53,38 +55,34 @@ class ListenerBasedPendingReadOp extends PendingReadOp {
                                ReadEntryListener listener,
                                Object ctx,
                                boolean isRecoveryRead) {
-        super(lh, scheduler, startEntryId, endEntryId, null, ctx, 
isRecoveryRead);
+        super(lh, scheduler, startEntryId, endEntryId, isRecoveryRead);
         this.listener = listener;
+        this.ctx = ctx;
     }
 
     @Override
     protected void submitCallback(int code) {
         LedgerEntryRequest request;
-        while ((request = seq.peek()) != null) {
+        while (!seq.isEmpty() && (request = seq.get(0)) != null) {
             if (!request.isComplete()) {
                 return;
             }
-            seq.remove();
+            seq.remove(0);
             long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
+            LedgerEntry entry;
             if (BKException.Code.OK == request.getRc()) {
                 readOpLogger.registerSuccessfulEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
+                // callback with completed entry
+                entry = new LedgerEntry(request.entryImpl);
             } else {
                 readOpLogger.registerFailedEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
+                entry = null;
             }
-            // callback with completed entry
-            listener.onEntryComplete(request.getRc(), lh, request, ctx);
+            request.close();
+            listener.onEntryComplete(request.getRc(), lh, entry, ctx);
         }
         // if all entries are already completed.
         cancelSpeculativeTask(true);
     }
 
-    @Override
-    public boolean hasMoreElements() {
-        return false;
-    }
-
-    @Override
-    public LedgerEntry nextElement() throws NoSuchElementException {
-        throw new NoSuchElementException();
-    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 40da31c..e31c5b7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -20,26 +20,24 @@
  */
 package org.apache.bookkeeper.client;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.buffer.ByteBuf;
-
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Enumeration;
 import java.util.HashSet;
-import java.util.NoSuchElementException;
-import java.util.Queue;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
@@ -55,16 +53,15 @@ import org.slf4j.LoggerFactory;
  * application as soon as it arrives rather than waiting for the whole thing.
  *
  */
-class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
+class PendingReadOp implements ReadEntryCallback, SafeRunnable {
     private static final Logger LOG = 
LoggerFactory.getLogger(PendingReadOp.class);
 
-    final private ScheduledExecutorService scheduler;
+    private final ScheduledExecutorService scheduler;
     private ScheduledFuture<?> speculativeTask = null;
-    Queue<LedgerEntryRequest> seq;
+    protected final List<LedgerEntryRequest> seq;
+    private final CompletableFuture<Iterable<LedgerEntry>> future;
     Set<BookieSocketAddress> heardFromHosts;
     BitSet heardFromHostsBitSet;
-    ReadCallback cb;
-    Object ctx;
     LedgerHandle lh;
     long numPendingEntries;
     long startEntryId;
@@ -77,7 +74,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
     boolean parallelRead = false;
     final AtomicBoolean complete = new AtomicBoolean(false);
 
-    abstract class LedgerEntryRequest extends LedgerEntry implements 
SpeculativeRequestExecutor {
+    abstract class LedgerEntryRequest implements SpeculativeRequestExecutor, 
AutoCloseable {
 
         final AtomicBoolean complete = new AtomicBoolean(false);
 
@@ -87,10 +84,10 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
 
         final ArrayList<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
+        final LedgerEntryImpl entryImpl;
 
         LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, 
long eId) {
-            super(lId, eId);
-
+            this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
 
             if (lh.bk.isReorderReadSequence()) {
@@ -98,12 +95,16 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
                     .reorderReadSequence(
                             ensemble,
                             lh.bookieFailureHistory.asMap(),
-                            lh.distributionSchedule.getWriteSet(entryId));
+                            lh.distributionSchedule.getWriteSet(eId));
             } else {
-                writeSet = lh.distributionSchedule.getWriteSet(entryId);
+                writeSet = lh.distributionSchedule.getWriteSet(eId);
             }
         }
 
+        public void close() {
+            entryImpl.close();
+        }
+
         /**
          * Execute the read request.
          */
@@ -124,7 +125,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
         boolean complete(int bookieIndex, BookieSocketAddress host, final 
ByteBuf buffer) {
             ByteBuf content;
             try {
-                content = lh.macManager.verifyDigestAndReturnData(entryId, 
buffer);
+                content = 
lh.macManager.verifyDigestAndReturnData(entryImpl.getEntryId(), buffer);
             } catch (BKDigestMatchException e) {
                 logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", 
BKException.Code.DigestMatchException);
                 buffer.release();
@@ -137,8 +138,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
                  * The length is a long and it is the last field of the 
metadata of an entry.
                  * Consequently, we have to subtract 8 from METADATA_LENGTH to 
get the length.
                  */
-                length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
-                data = content;
+                
entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
+                entryImpl.setEntryBuf(content);
                 writeSet.recycle();
                 return true;
             } else {
@@ -195,12 +196,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
                 ++numMissedEntryReads;
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("No such entry found on bookie.  L{} E{} bookie: 
{}",
-                        new Object[] { lh.ledgerId, entryId, host });
+                        new Object[] { lh.ledgerId, entryImpl.getEntryId(), 
host });
                 }
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(errMsg + " while reading L{} E{} from bookie: 
{}",
-                        new Object[]{lh.ledgerId, entryId, host});
+                        new Object[]{lh.ledgerId, entryImpl.getEntryId(), 
host});
                 }
             }
         }
@@ -235,7 +236,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
 
         @Override
         public String toString() {
-            return String.format("L%d-E%d", ledgerId, entryId);
+            return String.format("L%d-E%d", entryImpl.getLedgerId(), 
entryImpl.getEntryId());
         }
 
         /**
@@ -423,16 +424,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
     PendingReadOp(LedgerHandle lh,
                   ScheduledExecutorService scheduler,
                   long startEntryId,
-                  long endEntryId,
-                  ReadCallback cb,
-                  Object ctx) {
+                  long endEntryId) {
         this(
             lh,
             scheduler,
             startEntryId,
             endEntryId,
-            cb,
-            ctx,
             false);
     }
 
@@ -440,12 +437,9 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
                   ScheduledExecutorService scheduler,
                   long startEntryId,
                   long endEntryId,
-                  ReadCallback cb,
-                  Object ctx,
                   boolean isRecoveryRead) {
-        seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId + 
1) - startEntryId));
-        this.cb = cb;
-        this.ctx = ctx;
+        this.seq = new ArrayList<>((int) ((endEntryId + 1) - startEntryId));
+        this.future = new CompletableFuture<>();
         this.lh = lh;
         this.startEntryId = startEntryId;
         this.endEntryId = endEntryId;
@@ -460,6 +454,10 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
         readOpLogger = lh.bk.getReadOpLogger();
     }
 
+    CompletableFuture<Iterable<LedgerEntry>> future() {
+        return future;
+    }
+
     protected LedgerMetadata getLedgerMetadata() {
         return lh.metadata;
     }
@@ -476,7 +474,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
         return this;
     }
 
-    public void initiate() {
+    public void submit() {
+        lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, this);
+    }
+
+    void initiate() {
         long nextEnsembleChange = startEntryId, i = startEntryId;
         this.requestTimeNanos = MathUtils.nowInNano();
         ArrayList<BookieSocketAddress> ensemble = null;
@@ -503,6 +505,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
         }
     }
 
+    @Override
+    public void safeRun() {
+        initiate();
+    }
+
     private static class ReadContext implements ReadEntryCallbackCtx {
         final int bookieIndex;
         final BookieSocketAddress to;
@@ -531,7 +538,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
             lh.throttler.acquire();
         }
 
-        lh.bk.getBookieClient().readEntry(to, lh.ledgerId, entry.entryId,
+        lh.bk.getBookieClient().readEntry(to, lh.ledgerId, 
entry.entryImpl.getEntryId(),
                                      this, new ReadContext(bookieIndex, to, 
entry));
     }
 
@@ -574,37 +581,27 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
             return;
         }
 
+        cancelSpeculativeTask(true);
+
         long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
         if (code != BKException.Code.OK) {
             long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
             for (LedgerEntryRequest req : seq) {
                 if (!req.isComplete()) {
-                    firstUnread = req.getEntryId();
+                    firstUnread = req.entryImpl.getEntryId();
                     break;
                 }
             }
             LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {} 
: bitset = {}. First unread entry is {}",
                     new Object[] { lh.getId(), startEntryId, endEntryId, 
heardFromHosts, heardFromHostsBitSet, firstUnread });
             readOpLogger.registerFailedEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
+            // release the entries
+            seq.forEach(LedgerEntryRequest::close);
+            future.completeExceptionally(BKException.create(code));
         } else {
             readOpLogger.registerSuccessfulEvent(latencyNanos, 
TimeUnit.NANOSECONDS);
+            future.complete(Lists.transform(seq, input -> input.entryImpl));
         }
-        cancelSpeculativeTask(true);
-        cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
-        cb = null;
     }
 
-    @Override
-    public boolean hasMoreElements() {
-        return !seq.isEmpty();
-    }
-
-    @Override
-    public LedgerEntry nextElement() throws NoSuchElementException {
-        return seq.remove();
-    }
-
-    public int size() {
-        return seq.size();
-    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index ae721d5..9ee75d9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -24,17 +24,16 @@ import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.ReadLastConfirmedAndEntryContext;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.commons.lang3.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +64,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
     private long lastAddConfirmed;
     private long timeOutInMillis;
 
-    abstract class ReadLACAndEntryRequest extends LedgerEntry {
+    abstract class ReadLACAndEntryRequest implements AutoCloseable {
 
         final AtomicBoolean complete = new AtomicBoolean(false);
 
@@ -76,12 +75,12 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         final ArrayList<BookieSocketAddress> ensemble;
         final DistributionSchedule.WriteSet writeSet;
         final DistributionSchedule.WriteSet orderedEnsemble;
+        final LedgerEntryImpl entryImpl;
 
         ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long 
lId, long eId) {
-            super(lId, eId);
-
+            this.entryImpl = LedgerEntryImpl.create(lId, eId);
             this.ensemble = ensemble;
-            this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+            this.writeSet = lh.distributionSchedule.getWriteSet(eId);
             if (lh.bk.reorderReadSequence) {
                 this.orderedEnsemble = 
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
                         lh.bookieFailureHistory.asMap(), writeSet.copy());
@@ -90,6 +89,10 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
             }
         }
 
+        public void close() {
+            entryImpl.close();
+        }
+
         synchronized int getFirstError() {
             return firstError;
         }
@@ -125,13 +128,12 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
                 writeSet.recycle();
                 orderedEnsemble.recycle();
                 rc = BKException.Code.OK;
-                this.entryId = entryId;
                 /*
                  * The length is a long and it is the last field of the 
metadata of an entry.
                  * Consequently, we have to subtract 8 from METADATA_LENGTH to 
get the length.
                  */
-                length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
-                data = content;
+                
entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
+                entryImpl.setEntryBuf(content);
                 return true;
             } else {
                 return false;
@@ -193,13 +195,13 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
                 // treat these errors as failures if the node from which we 
received this is part of
                 // the writeSet
                 if (this.writeSet.contains(bookieIndex)) {
-                    lh.registerOperationFailureOnBookie(host, entryId);
+                    lh.registerOperationFailureOnBookie(host, 
entryImpl.getEntryId());
                 }
                 ++numMissedEntryReads;
             }
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug(errMsg + " while reading entry: " + entryId + " 
ledgerId: " + lh.ledgerId + " from bookie: "
+                LOG.debug(errMsg + " while reading entry: " + 
entryImpl.getEntryId() + " ledgerId: " + lh.ledgerId + " from bookie: "
                     + host);
             }
         }
@@ -234,7 +236,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
 
         @Override
         public String toString() {
-            return String.format("L%d-E%d", ledgerId, entryId);
+            return String.format("L%d-E%d", entryImpl.getLedgerId(), 
entryImpl.getEntryId());
         }
     }
 
@@ -493,15 +495,24 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         public void readLastConfirmedAndEntryComplete(int rc, long 
lastAddConfirmed, LedgerEntry entry);
     }
 
-    private void submitCallback(int rc, long lastAddConfirmed, LedgerEntry 
entry) {
+    private void submitCallback(int rc) {
         long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
+        LedgerEntry entry;
         if (BKException.Code.OK != rc) {
             lh.bk.getReadLacAndEntryOpLogger()
                 .registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
+            entry = null;
         } else {
+            // could received advanced lac, with no entry
             lh.bk.getReadLacAndEntryOpLogger()
                 .registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
+            if (request.entryImpl.getEntryBuffer() != null) {
+                entry = new LedgerEntry(request.entryImpl);
+            } else {
+                entry = null;
+            }
         }
+        request.close();
         cb.readLastConfirmedAndEntryComplete(rc, lastAddConfirmed, entry);
     }
 
@@ -537,7 +548,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
                             .registerSuccessfulEvent(elapsedMicros, 
TimeUnit.MICROSECONDS);
                     }
 
-                    submitCallback(BKException.Code.OK, lastAddConfirmed, 
request);
+                    submitCallback(BKException.Code.OK);
                     requestComplete.set(true);
                     heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
                 }
@@ -564,7 +575,7 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
                 return;
             }
         } else if (BKException.Code.UnauthorizedAccessException == rc && 
!requestComplete.get()) {
-            submitCallback(rc, lastAddConfirmed, null);
+            submitCallback(rc);
             requestComplete.set(true);
         } else {
             request.logErrorAndReattemptRead(rCtx.getBookieIndex(), bookie, 
"Error: " + BKException.getMessage(rc), rc);
@@ -580,10 +591,10 @@ class ReadLastConfirmedAndEntryOp implements 
BookkeeperInternalCallbacks.ReadEnt
         if (requestComplete.compareAndSet(false, true)) {
             if (!hasValidResponse) {
                 // no success called
-                submitCallback(request.getFirstError(), lastAddConfirmed, 
null);
+                submitCallback(request.getFirstError());
             } else {
                 // callback
-                submitCallback(BKException.Code.OK, lastAddConfirmed, null);
+                submitCallback(BKException.Code.OK);
             }
         }
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
index 13bcff8..5ac3080 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
@@ -17,13 +17,11 @@
  */
 package org.apache.bookkeeper.client;
 
-import com.google.common.collect.Iterators;
 import java.util.Enumeration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
-import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LastConfirmedAndEntryImpl;
 
 /**
@@ -220,34 +218,6 @@ class SyncCallbackUtils {
         }
     }
 
-    static class FutureReadResult
-        extends 
CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>
-        implements AsyncCallback.ReadCallback {
-
-        /**
-         * Implementation of callback interface for read method of {@link 
ReadHandle}.
-         *
-         * @param rc
-         *          return code
-         * @param lh
-         *          ledger handle
-         * @param seq
-         *          sequence of entries
-         * @param ctx
-         *          control object
-         */
-        @Override
-        @SuppressWarnings("unchecked")
-        public void readComplete(int rc, LedgerHandle lh,
-                                 Enumeration<LedgerEntry> seq, Object ctx) {
-            if (rc != BKException.Code.OK) {
-                
this.completeExceptionally(BKException.create(rc).fillInStackTrace());
-            } else {
-                this.complete((Iterable) () -> Iterators.forEnumeration(seq));
-            }
-        }
-    }
-
     static class SyncAddCallback extends CompletableFuture<Long> implements 
AsyncCallback.AddCallback {
 
         /**
@@ -320,7 +290,7 @@ class SyncCallbackUtils {
 
         @Override
         public void readLastConfirmedAndEntryComplete(int rc, long 
lastConfirmed, LedgerEntry entry, Object ctx) {
-            LastConfirmedAndEntry result = new 
LastConfirmedAndEntryImpl(lastConfirmed, entry);
+            LastConfirmedAndEntry result = 
LastConfirmedAndEntryImpl.create(lastConfirmed, entry);
             finish(rc, result, this);
         }
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
index 3a10d96..8bbe58e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
@@ -24,14 +24,14 @@ package org.apache.bookkeeper.client.api;
  * This contains LastAddConfirmed entryId and a LedgerEntry wanted to read.
  * It is used for readLastAddConfirmedAndEntry.
  */
-public interface LastConfirmedAndEntry {
+public interface LastConfirmedAndEntry extends AutoCloseable {
 
     /**
      * Gets LastAddConfirmed entryId.
      *
      * @return the LastAddConfirmed
      */
-    Long getLastAddConfirmed();
+    long getLastAddConfirmed();
 
     /**
      * Whether this entity contains an entry.
@@ -47,4 +47,9 @@ public interface LastConfirmedAndEntry {
      */
     LedgerEntry getEntry();
 
+    /**
+     * {@inheritDoc}
+     */
+    void close();
+
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
index 97fe377..43bdc34 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
@@ -23,7 +23,6 @@ package org.apache.bookkeeper.client.api;
 import io.netty.buffer.ByteBuf;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
-import org.apache.bookkeeper.conf.ClientConfiguration;
 
 /**
  * An entry.
@@ -32,7 +31,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
  */
 @Public
 @Unstable
-public interface LedgerEntry {
+public interface LedgerEntry extends AutoCloseable {
 
     /**
      * The id of the ledger which contains the entry.
@@ -56,25 +55,31 @@ public interface LedgerEntry {
     long getLength();
 
     /**
-     * Returns the content of the entry. This method can be called only once. 
While using v2 wire protocol this method
-     * will automatically release the internal ByteBuf.
+     * Returns the content of the entry.
      *
      * @return the content of the entry
-     * @throws IllegalStateException if this method is called twice
      */
     byte[] getEntry();
 
     /**
      * Return the internal buffer that contains the entry payload.
      *
-     * <p>Note: Using v2 wire protocol it is responsibility of the caller
-     * to ensure to release the buffer after usage.
-     *
      * @return a ByteBuf which contains the data
-     *
-     * @see ClientConfiguration#setNettyUsePooledBuffers(boolean)
-     * @throws IllegalStateException if the entry has been retrieved by {@link 
#getEntry()}
      */
     ByteBuf getEntryBuffer();
 
+    /**
+     * Returns a duplicate of this entry.
+     *
+     * <p>This call will retain a slice of the underneath byte buffer.
+     *
+     * @return a duplicated ledger entry.
+     */
+    LedgerEntry duplicate();
+
+    /**
+     * {@inheritDoc}
+     */
+    void close();
+
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
index 4ed78f9..8f1924a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.client.impl;
 
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 
@@ -29,19 +31,37 @@ import org.apache.bookkeeper.client.api.LedgerEntry;
  */
 public class LastConfirmedAndEntryImpl implements LastConfirmedAndEntry {
 
-    private final Long lac;
-    private final LedgerEntry entry;
+    private static final Recycler<LastConfirmedAndEntryImpl> RECYCLER = new 
Recycler<LastConfirmedAndEntryImpl>() {
+        @Override
+        protected LastConfirmedAndEntryImpl 
newObject(Handle<LastConfirmedAndEntryImpl> handle) {
+            return new LastConfirmedAndEntryImpl(handle);
+        }
+    };
 
-    public LastConfirmedAndEntryImpl(Long lac, LedgerEntry entry) {
-        this.lac = lac;
-        this.entry = entry;
+    public static LastConfirmedAndEntryImpl create(long lac, 
org.apache.bookkeeper.client.LedgerEntry entry) {
+        LastConfirmedAndEntryImpl entryImpl = RECYCLER.get();
+        entryImpl.lac = lac;
+        entryImpl.entry = LedgerEntryImpl.create(
+            entry.getLedgerId(),
+            entry.getEntryId(),
+            entry.getLength(),
+            entry.getEntryBuffer());
+        return entryImpl;
+    }
+
+    private final Handle<LastConfirmedAndEntryImpl> recycleHandle;
+    private Long lac;
+    private LedgerEntry entry;
+
+    public LastConfirmedAndEntryImpl(Handle<LastConfirmedAndEntryImpl> handle) 
{
+        this.recycleHandle = handle;
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public Long getLastAddConfirmed() {
+    public long getLastAddConfirmed() {
         return lac;
     }
 
@@ -60,4 +80,17 @@ public class LastConfirmedAndEntryImpl implements 
LastConfirmedAndEntry {
     public LedgerEntry getEntry() {
         return entry;
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void close() {
+        this.lac = -1L;
+        if (null != entry) {
+            entry.close();
+            entry = null;
+        }
+        recycleHandle.recycle(this);
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
new file mode 100644
index 0000000..b90f299
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+
+/**
+ * Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and
+ * the entry content.
+ */
+public class LedgerEntryImpl implements LedgerEntry {
+
+    private static final Recycler<LedgerEntryImpl> RECYCLER = new 
Recycler<LedgerEntryImpl>() {
+        @Override
+        protected LedgerEntryImpl newObject(Handle<LedgerEntryImpl> handle) {
+            return new LedgerEntryImpl(handle);
+        }
+    };
+
+    public static LedgerEntryImpl create(long ledgerId,
+                                         long entryId) {
+        LedgerEntryImpl entry = RECYCLER.get();
+        entry.ledgerId = ledgerId;
+        entry.entryId = entryId;
+        return entry;
+    }
+
+    public static LedgerEntryImpl create(long ledgerId,
+                                         long entryId,
+                                         long length,
+                                         ByteBuf buf) {
+        LedgerEntryImpl entry = RECYCLER.get();
+        entry.ledgerId = ledgerId;
+        entry.entryId = entryId;
+        entry.length = length;
+        entry.entryBuf = buf;
+        return entry;
+    }
+
+    public static LedgerEntryImpl duplicate(LedgerEntry entry) {
+        return create(
+            entry.getLedgerId(),
+            entry.getEntryId(),
+            entry.getLength(),
+            entry.getEntryBuffer().retainedSlice());
+    }
+
+    private final Handle<LedgerEntryImpl> recycleHandle;
+    private long ledgerId;
+    private long entryId;
+    private long length;
+    private ByteBuf entryBuf;
+
+    private LedgerEntryImpl(Handle<LedgerEntryImpl> handle) {
+        this.recycleHandle = handle;
+    }
+
+    public void setEntryId(long entryId) {
+        this.entryId = entryId;
+    }
+
+    public void setLength(long length) {
+        this.length = length;
+    }
+
+    public void setEntryBuf(ByteBuf buf) {
+        this.entryBuf = buf;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long getLedgerId() {
+        return ledgerId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long getEntryId() {
+        return entryId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long getLength() {
+        return length;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public byte[] getEntry() {
+        return ByteBufUtil.getBytes(entryBuf);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ByteBuf getEntryBuffer() {
+        return entryBuf;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public LedgerEntryImpl duplicate() {
+        return duplicate(this);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void close() {
+        recycle();
+    }
+
+    private void recycle() {
+        this.ledgerId = -1L;
+        this.entryId = -1L;
+        this.length = -1L;
+        ReferenceCountUtil.release(entryBuf);
+        this.entryBuf = null;
+        recycleHandle.recycle(this);
+    }
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 91ba830..8fb5c02 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -53,7 +53,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.junit.After;
 import org.junit.Before;
-import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -336,13 +335,13 @@ public abstract class MockBookKeeperTestCase {
                 fencedLedgers.add(ledgerId);
                 MockEntry mockEntry = getMockLedgerEntry(ledgerId, 
bookieSocketAddress, entryId);
                 if (mockEntry != null) {
-                    LOG.info("readEntryAndFenceLedger - found mock entry {}@{} 
at {}", ledgerId, entryId, bookieSocketAddress);
+                    LOG.info("readEntryAndFenceLedger - found mock entry {}@{} 
at {}", entryId, ledgerId, bookieSocketAddress);
                     ByteBuf entry = 
macManager.computeDigestAndPackageForSending(entryId, 
mockEntry.lastAddConfirmed,
                         mockEntry.payload.length, 
Unpooled.wrappedBuffer(mockEntry.payload));
                     callback.readEntryComplete(BKException.Code.OK, ledgerId, 
entryId, Unpooled.copiedBuffer(entry), args[5]);
                     entry.release();
                 } else {
-                    LOG.info("readEntryAndFenceLedger - no such mock entry 
{}@{} at {}", ledgerId, entryId, bookieSocketAddress);
+                    LOG.info("readEntryAndFenceLedger - no such mock entry 
{}@{} at {}", entryId, ledgerId, bookieSocketAddress);
                     
callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId, 
entryId, null, args[5]);
                 }
             });
@@ -361,13 +360,13 @@ public abstract class MockBookKeeperTestCase {
                 DigestManager macManager = new CRC32DigestManager(ledgerId);
                 MockEntry mockEntry = getMockLedgerEntry(ledgerId, 
bookieSocketAddress, entryId);
                 if (mockEntry != null) {
-                    LOG.info("readEntry - found mock entry {}@{} at {}", 
ledgerId, entryId, bookieSocketAddress);
+                    LOG.info("readEntry - found mock entry {}@{} at {}", 
entryId, ledgerId, bookieSocketAddress);
                     ByteBuf entry = 
macManager.computeDigestAndPackageForSending(entryId,
                         mockEntry.lastAddConfirmed, mockEntry.payload.length, 
Unpooled.wrappedBuffer(mockEntry.payload));
                     callback.readEntryComplete(BKException.Code.OK, ledgerId, 
entryId, Unpooled.copiedBuffer(entry), args[4]);
                     entry.release();
                 } else {
-                    LOG.info("readEntry - no such mock entry {}@{} at {}", 
ledgerId, entryId, bookieSocketAddress);
+                    LOG.info("readEntry - no such mock entry {}@{} at {}", 
entryId, ledgerId, bookieSocketAddress);
                     
callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId, 
entryId, null, args[4]);
                 }
             });
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index 17ba6f1..eff8c3a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -20,13 +20,20 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -34,10 +41,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.concurrent.CountDownLatch;
-
 /**
  * Unit tests for parallel reading
  */
@@ -63,39 +66,6 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
         return lh.getId();
     }
 
-    static class LatchCallback implements ReadCallback {
-
-        final CountDownLatch l = new CountDownLatch(1);
-        int rc = -0x1314;
-        Enumeration<LedgerEntry> entries;
-
-        Enumeration<LedgerEntry> getEntries() {
-            return entries;
-        }
-
-        int getRc() {
-            return rc;
-        }
-
-        @Override
-        public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object ctx) {
-            this.rc = rc;
-            this.entries = seq;
-            l.countDown();
-        }
-
-        void expectSuccess() throws Exception {
-            l.await();
-            assertTrue(BKException.Code.OK == rc);
-        }
-
-        void expectFail() throws Exception {
-            l.await();
-            assertFalse(BKException.Code.OK == rc);
-        }
-
-    }
-
     @Test
     public void testNormalParallelRead() throws Exception {
         int numEntries = 10;
@@ -105,34 +75,34 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
 
         // read single entry
         for (int i = 0; i < numEntries; i++) {
-            LatchCallback latch = new LatchCallback();
             PendingReadOp readOp =
-                    new PendingReadOp(lh, lh.bk.scheduler, i, i, latch, null);
-            readOp.parallelRead(true).initiate();
-            latch.expectSuccess();
-            Enumeration<LedgerEntry> entries = latch.getEntries();
-            assertNotNull(entries);
-            assertTrue(entries.hasMoreElements());
-            LedgerEntry entry = entries.nextElement();
+                    new PendingReadOp(lh, lh.bk.scheduler, i, i);
+            readOp.parallelRead(true).submit();
+            Iterable<LedgerEntry> iterable = readOp.future().get();
+            assertNotNull(iterable);
+            Iterator<LedgerEntry> entries = iterable.iterator();
+            assertTrue(entries.hasNext());
+            LedgerEntry entry = entries.next();
             assertNotNull(entry);
             assertEquals(i, Integer.parseInt(new String(entry.getEntry())));
-            assertFalse(entries.hasMoreElements());
+            entry.close();
+            assertFalse(entries.hasNext());
         }
 
         // read multiple entries
-        LatchCallback latch = new LatchCallback();
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, 
latch, null);
-        readOp.parallelRead(true).initiate();
-        latch.expectSuccess();
-        Enumeration<LedgerEntry> entries = latch.getEntries();
-        assertNotNull(entries);
+                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+        readOp.parallelRead(true).submit();
+        Iterable<LedgerEntry> iterable = readOp.future().get();
+        assertNotNull(iterable);
+        Iterator<LedgerEntry> iterator = iterable.iterator();
 
         int numReads = 0;
-        while (entries.hasMoreElements()) {
-            LedgerEntry entry = entries.nextElement();
+        while (iterator.hasNext()) {
+            LedgerEntry entry = iterator.next();
             assertNotNull(entry);
             assertEquals(numReads, Integer.parseInt(new 
String(entry.getEntry())));
+            entry.close();
             ++numReads;
         }
         assertEquals(numEntries, numReads);
@@ -140,6 +110,17 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
         lh.close();
     }
 
+    private static <T> void expectFail(CompletableFuture<T> future, int 
expectedRc) {
+        try {
+            result(future);
+            fail("Expect to fail");
+        } catch (Exception e) {
+            assertTrue(e instanceof BKException);
+            BKException bke = (BKException) e;
+            assertEquals(expectedRc, bke.getCode());
+        }
+    }
+
     @Test
     public void testParallelReadMissingEntries() throws Exception {
         int numEntries = 10;
@@ -148,19 +129,15 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
         // read single entry
-        LatchCallback latch = new LatchCallback();
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 11, 11, latch, null);
-        readOp.parallelRead(true).initiate();
-        latch.expectFail();
-        assertEquals(BKException.Code.NoSuchEntryException, latch.getRc());
+                new PendingReadOp(lh, lh.bk.scheduler, 11, 11);
+        readOp.parallelRead(true).submit();
+        expectFail(readOp.future(), Code.NoSuchEntryException);
 
         // read multiple entries
-        latch = new LatchCallback();
-        readOp = new PendingReadOp(lh, lh.bk.scheduler, 8, 11, latch, null);
-        readOp.parallelRead(true).initiate();
-        latch.expectFail();
-        assertEquals(BKException.Code.NoSuchEntryException, latch.getRc());
+        readOp = new PendingReadOp(lh, lh.bk.scheduler, 8, 11);
+        readOp.parallelRead(true).submit();
+        expectFail(readOp.future(), Code.NoSuchEntryException);
 
         lh.close();
     }
@@ -186,13 +163,11 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
         sleepBookie(ensemble.get(0), latch1);
         sleepBookie(ensemble.get(1), latch2);
 
-        LatchCallback latchCallback = new LatchCallback();
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 10, 10, latchCallback, 
null);
-        readOp.parallelRead(true).initiate();
+                new PendingReadOp(lh, lh.bk.scheduler, 10, 10);
+        readOp.parallelRead(true).submit();
         // would fail immediately if found missing entries don't cover ack 
quorum
-        latchCallback.expectFail();
-        assertEquals(BKException.Code.NoSuchEntryException, 
latchCallback.getRc());
+        expectFail(readOp.future(), Code.NoSuchEntryException);
         latch1.countDown();
         latch2.countDown();
 
@@ -220,17 +195,16 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
         killBookie(ensemble.get(1));
 
         // read multiple entries
-        LatchCallback latch = new LatchCallback();
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, 
latch, null);
-        readOp.parallelRead(true).initiate();
-        latch.expectSuccess();
-        Enumeration<LedgerEntry> entries = latch.getEntries();
-        assertNotNull(entries);
+                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+        readOp.parallelRead(true).submit();
+        Iterable<LedgerEntry> iterable = readOp.future().get();
+        assertNotNull(iterable);
+        Iterator<LedgerEntry> entries = iterable.iterator();
 
         int numReads = 0;
-        while (entries.hasMoreElements()) {
-            LedgerEntry entry = entries.nextElement();
+        while (entries.hasNext()) {
+            LedgerEntry entry = entries.next();
             assertNotNull(entry);
             assertEquals(numReads, Integer.parseInt(new 
String(entry.getEntry())));
             ++numReads;
@@ -262,12 +236,10 @@ public class TestParallelRead extends 
BookKeeperClusterTestCase {
         killBookie(ensemble.get(2));
 
         // read multiple entries
-        LatchCallback latch = new LatchCallback();
         PendingReadOp readOp =
-                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1, 
latch, null);
-        readOp.parallelRead(true).initiate();
-        latch.expectFail();
-        assertEquals(BKException.Code.BookieHandleNotAvailableException, 
latch.getRc());
+                new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+        readOp.parallelRead(true).submit();
+        expectFail(readOp.future(), Code.BookieHandleNotAvailableException);
 
         lh.close();
         newBk.close();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
index 11e46e9..fb9a747 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
@@ -24,6 +24,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
@@ -32,11 +36,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
 /**
  * Unit tests for {@link 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener}.
  */
@@ -87,11 +86,17 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
 
         @Override
         public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry 
entry, Object ctx) {
-            if (nextEntryId != entry.getEntryId()) {
-                inOrder = false;
+            long entryId;
+            if (BKException.Code.OK == rc) {
+                if (nextEntryId != entry.getEntryId()) {
+                    inOrder = false;
+                }
+                entryId = entry.getEntryId();
+            } else {
+                entryId = nextEntryId;
             }
+            resultCodes.put(entryId, new EntryWithRC(rc, entry));
             ++nextEntryId;
-            resultCodes.put(entry.getEntryId(), new EntryWithRC(rc, entry));
             l.countDown();
         }
 
@@ -115,7 +120,7 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
             LatchListener listener = new LatchListener(i, 1);
             ListenerBasedPendingReadOp readOp =
                     new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, i, i, 
listener, null);
-            readOp.parallelRead(parallelRead).initiate();
+            readOp.parallelRead(parallelRead).submit();
             listener.expectComplete();
             assertEquals(1, listener.resultCodes.size());
             EntryWithRC entry = listener.resultCodes.get((long) i);
@@ -129,7 +134,7 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
         LatchListener listener = new LatchListener(0L, numEntries);
         ListenerBasedPendingReadOp readOp =
                 new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, 
numEntries - 1, listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(numEntries, listener.resultCodes.size());
         for (int i = 0; i < numEntries; i++) {
@@ -163,7 +168,7 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
         LatchListener listener = new LatchListener(11L, 1);
         ListenerBasedPendingReadOp readOp =
                 new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 11, 11, 
listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(1, listener.resultCodes.size());
         EntryWithRC entry = listener.resultCodes.get(11L);
@@ -174,7 +179,7 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
         // read multiple missing entries
         listener = new LatchListener(11L, 3);
         readOp = new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 11, 13, 
listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(3, listener.resultCodes.size());
         assertTrue(listener.isInOrder());
@@ -188,7 +193,7 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
         // read multiple entries with missing entries
         listener = new LatchListener(5L, 10);
         readOp = new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 5L, 14L, 
listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(10, listener.resultCodes.size());
         assertTrue(listener.isInOrder());
@@ -234,7 +239,7 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
         LatchListener listener = new LatchListener(0L, numEntries);
         ListenerBasedPendingReadOp readOp =
                 new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, 
numEntries - 1, listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(numEntries, listener.resultCodes.size());
         for (int i = 0; i < numEntries; i++) {
@@ -265,7 +270,7 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
         LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
 
         ArrayList<BookieSocketAddress> ensemble =
-                lh.getLedgerMetadata().getEnsemble(5);
+            lh.getLedgerMetadata().getEnsemble(5);
         // kill bookies
         killBookie(ensemble.get(0));
         killBookie(ensemble.get(1));
@@ -274,8 +279,8 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
         // read multiple entries
         LatchListener listener = new LatchListener(0L, numEntries);
         ListenerBasedPendingReadOp readOp =
-                new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, 
numEntries - 1, listener, null);
-        readOp.parallelRead(parallelRead).initiate();
+            new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries 
- 1, listener, null);
+        readOp.parallelRead(parallelRead).submit();
         listener.expectComplete();
         assertEquals(numEntries, listener.resultCodes.size());
         for (int i = 0; i < numEntries; i++) {
@@ -294,11 +299,11 @@ public class TestReadEntryListener extends 
BookKeeperClusterTestCase {
 
     @Test
     public void testReadFailureWithFailedBookiesEnableParallelRead() throws 
Exception {
-        readWithFailedBookiesTest(true);
+        readFailureWithFailedBookiesTest(true);
     }
 
     @Test
     public void testReadFailureWithFailedBookiesDisableParallelRead() throws 
Exception {
-        readWithFailedBookiesTest(false);
+        readFailureWithFailedBookiesTest(false);
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index 5ca752a..acfeb8c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -20,6 +20,10 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Enumeration;
@@ -34,8 +38,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
-
 /**
  * This unit test tests ledger fencing;
  *
@@ -292,9 +294,7 @@ public class TestSpeculativeRead extends 
BookKeeperClusterTestCase {
         secondHostOnly.set(1, true);
         PendingReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null;
         try {
-            LatchCallback latch0 = new LatchCallback();
-            PendingReadOp op = new PendingReadOp(l, bkspec.scheduler,
-                                                 0, 5, latch0, null);
+            PendingReadOp op = new PendingReadOp(l, bkspec.scheduler, 0, 5);
 
             // if we've already heard from all hosts,
             // we only send the initial read
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
index 8a3d68f..bbaa358 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
@@ -207,8 +207,9 @@ public class BookKeeperApiTest extends 
MockBookKeeperTestCase {
             // test readLastAddConfirmedAndEntry
             LastConfirmedAndEntry lastConfirmedAndEntry =
                 result(reader.readLastAddConfirmedAndEntry(0, 999, false));
-            assertEquals(2, 
lastConfirmedAndEntry.getLastAddConfirmed().intValue());
+            assertEquals(2L, lastConfirmedAndEntry.getLastAddConfirmed());
             assertArrayEquals(data, 
lastConfirmedAndEntry.getEntry().getEntry());
+            lastConfirmedAndEntry.close();
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to