This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.6 by this push:
new 9f73b61 ISSUE #731: refine LedgerEntry interface and implementation
9f73b61 is described below
commit 9f73b617af15b153130832a77c4b632ff095a80c
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Nov 22 09:11:08 2017 +0800
ISSUE #731: refine LedgerEntry interface and implementation
In LedgerEntry interface, getEntry() and getEntryBuffer() have completely
different behaviours. getEntry() releases bytebuf automatically, while
getEntryBuffer() returns the bytebuf (if you don't call getEntry, you are
responsible for releasing the entry buffer.
In this change:
make getEntry() reenter-able; make LedgerEntry implement AutoCloseable;
LedgerEntry is responsible for releasing the bytebuf it is holding.
Descriptions of the changes in this PR:
1. add `close` and `duplicate` in `api.LedgerEntry`,
1. LedgerEntryImpl implements `api.LedgerEntry`,
1. client.LedgerEntry doesn't implement the new api. it wraps an
`api.LedgerEntry`,
1. add `close` in api.LastConfirmedAndEntry."
1. fix build and test errors.
Author: Jia Zhai <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #738 from zhaijack/issue-731, closes #731
(cherry picked from commit 87d1afcc329d7261f19a6310ca9a7cf2acce4dee)
Signed-off-by: Jia Zhai <[email protected]>
---
.../org/apache/bookkeeper/client/LedgerEntry.java | 22 ++-
.../org/apache/bookkeeper/client/LedgerHandle.java | 82 +++++++++--
.../apache/bookkeeper/client/LedgerRecoveryOp.java | 2 -
.../client/ListenerBasedPendingReadOp.java | 28 ++--
.../apache/bookkeeper/client/PendingReadOp.java | 105 +++++++-------
.../client/ReadLastConfirmedAndEntryOp.java | 45 +++---
.../bookkeeper/client/SyncCallbackUtils.java | 32 +----
.../client/api/LastConfirmedAndEntry.java | 9 +-
.../apache/bookkeeper/client/api/LedgerEntry.java | 27 ++--
.../client/impl/LastConfirmedAndEntryImpl.java | 45 +++++-
.../bookkeeper/client/impl/LedgerEntryImpl.java | 155 +++++++++++++++++++++
.../bookkeeper/client/MockBookKeeperTestCase.java | 9 +-
.../apache/bookkeeper/client/TestParallelRead.java | 138 ++++++++----------
.../bookkeeper/client/TestReadEntryListener.java | 43 +++---
.../bookkeeper/client/TestSpeculativeRead.java | 10 +-
.../bookkeeper/client/api/BookKeeperApiTest.java | 3 +-
16 files changed, 482 insertions(+), 273 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
index 4e39e51..8cb31f3 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
@@ -23,8 +23,8 @@ package org.apache.bookkeeper.client;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
-
import java.io.InputStream;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.conf.ClientConfiguration;
/**
@@ -32,30 +32,28 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
* the entry content.
*
*/
-public class LedgerEntry
- implements org.apache.bookkeeper.client.api.LedgerEntry {
+public class LedgerEntry {
final long ledgerId;
- long entryId;
- long length;
+ final long entryId;
+ final long length;
ByteBuf data;
- LedgerEntry(long lId, long eId) {
- this.ledgerId = lId;
- this.entryId = eId;
+ LedgerEntry(LedgerEntryImpl entry) {
+ this.ledgerId = entry.getLedgerId();
+ this.entryId = entry.getEntryId();
+ this.length = entry.getLength();
+ this.data = entry.getEntryBuffer().retain();
}
- @Override
public long getLedgerId() {
return ledgerId;
}
- @Override
public long getEntryId() {
return entryId;
}
- @Override
public long getLength() {
return length;
}
@@ -68,7 +66,6 @@ public class LedgerEntry
* @return the content of the entry
* @throws IllegalStateException if this method is called twice
*/
- @Override
public byte[] getEntry() {
Preconditions.checkState(null != data, "entry content can be accessed
only once");
byte[] entry = new byte[data.readableBytes()];
@@ -105,7 +102,6 @@ public class LedgerEntry
* @throws IllegalStateException if the entry has been retrieved by {@link
#getEntry()}
* or {@link #getEntryInputStream()}.
*/
- @Override
public ByteBuf getEntryBuffer() {
Preconditions.checkState(null != data, "entry content has been
retrieved" +
" by #getEntry or #getEntryInputStream");
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 60ad65f..49962a9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -20,11 +20,14 @@
*/
package org.apache.bookkeeper.client;
+import static
org.apache.bookkeeper.client.api.BKException.Code.ClientClosedException;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
@@ -49,16 +52,21 @@ import
org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
+import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
+import org.apache.bookkeeper.client.BKException.BKReadException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmed;
import
org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmedAndEntry;
-import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadResult;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCloseCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
import
org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallback;
+import org.apache.bookkeeper.client.api.BKException.Code;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -69,6 +77,7 @@ import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import
org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.commons.collections4.IteratorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -626,9 +635,20 @@ public class LedgerHandle implements WriteHandle {
*/
@Override
public
CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>
read(long firstEntry, long lastEntry) {
- FutureReadResult result = new FutureReadResult();
- asyncReadEntries(firstEntry, lastEntry, result, null);
- return result;
+ // Little sanity check
+ if (firstEntry < 0 || firstEntry > lastEntry) {
+ LOG.error("IncorrectParameterException on ledgerId:{}
firstEntry:{} lastEntry:{}",
+ new Object[] { ledgerId, firstEntry, lastEntry });
+ return FutureUtils.exception(new BKIncorrectParameterException());
+ }
+
+ if (lastEntry > lastAddConfirmed) {
+ LOG.error("ReadException on ledgerId:{} firstEntry:{}
lastEntry:{}",
+ new Object[] { ledgerId, firstEntry, lastEntry });
+ return FutureUtils.exception(new BKReadException());
+ }
+
+ return readEntriesInternalAsync(firstEntry, lastEntry);
}
/**
@@ -656,14 +676,58 @@ public class LedgerHandle implements WriteHandle {
*/
@Override
public
CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>
readUnconfirmed(long firstEntry, long lastEntry) {
- FutureReadResult result = new FutureReadResult();
- asyncReadUnconfirmedEntries(firstEntry, lastEntry, result, null);
- return result;
+ // Little sanity check
+ if (firstEntry < 0 || firstEntry > lastEntry) {
+ LOG.error("IncorrectParameterException on ledgerId:{}
firstEntry:{} lastEntry:{}",
+ new Object[] { ledgerId, firstEntry, lastEntry });
+ return FutureUtils.exception(new BKIncorrectParameterException());
+ }
+
+ return readEntriesInternalAsync(firstEntry, lastEntry);
}
void asyncReadEntriesInternal(long firstEntry, long lastEntry,
ReadCallback cb, Object ctx) {
- new PendingReadOp(this, bk.getScheduler(),
- firstEntry, lastEntry, cb, ctx).initiate();
+ if(!bk.isClosed()) {
+ readEntriesInternalAsync(firstEntry, lastEntry)
+ .whenCompleteAsync(new
FutureEventListener<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>() {
+ @Override
+ public void
onSuccess(Iterable<org.apache.bookkeeper.client.api.LedgerEntry> iterable) {
+ cb.readComplete(
+ Code.OK,
+ LedgerHandle.this,
+ IteratorUtils.asEnumeration(
+ Iterators.transform(iterable.iterator(), le ->
{
+ LedgerEntry entry = new
LedgerEntry((LedgerEntryImpl) le);
+ le.close();
+ return entry;
+ })),
+ ctx);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ if (cause instanceof BKException) {
+ BKException bke = (BKException) cause;
+ cb.readComplete(bke.getCode(), LedgerHandle.this,
null, ctx);
+ } else {
+ cb.readComplete(Code.UnexpectedConditionException,
LedgerHandle.this, null, ctx);
+ }
+ }
+ }, bk.getMainWorkerPool().chooseThread(ledgerId));
+ } else {
+ cb.readComplete(Code.ClientClosedException, LedgerHandle.this,
null, ctx);
+ }
+ }
+
+ CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>
readEntriesInternalAsync(long firstEntry,
+
long lastEntry) {
+ PendingReadOp op = new PendingReadOp(this, bk.getScheduler(),
firstEntry, lastEntry);
+ if(!bk.isClosed()) {
+ bk.getMainWorkerPool().submitOrdered(ledgerId, op);
+ } else {
+
op.future().completeExceptionally(BKException.create(ClientClosedException));
+ }
+ return op.future();
}
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index df7c84e..cde52a1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -21,10 +21,8 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.DigestManager.RecoveryData;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
index afb21bf..290e69b 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java
@@ -20,15 +20,17 @@
*/
package org.apache.bookkeeper.client;
-import java.util.NoSuchElementException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
import org.apache.bookkeeper.util.MathUtils;
+@Slf4j
class ListenerBasedPendingReadOp extends PendingReadOp {
final ReadEntryListener listener;
+ final Object ctx;
ListenerBasedPendingReadOp(LedgerHandle lh,
ScheduledExecutorService scheduler,
@@ -53,38 +55,34 @@ class ListenerBasedPendingReadOp extends PendingReadOp {
ReadEntryListener listener,
Object ctx,
boolean isRecoveryRead) {
- super(lh, scheduler, startEntryId, endEntryId, null, ctx,
isRecoveryRead);
+ super(lh, scheduler, startEntryId, endEntryId, isRecoveryRead);
this.listener = listener;
+ this.ctx = ctx;
}
@Override
protected void submitCallback(int code) {
LedgerEntryRequest request;
- while ((request = seq.peek()) != null) {
+ while (!seq.isEmpty() && (request = seq.get(0)) != null) {
if (!request.isComplete()) {
return;
}
- seq.remove();
+ seq.remove(0);
long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
+ LedgerEntry entry;
if (BKException.Code.OK == request.getRc()) {
readOpLogger.registerSuccessfulEvent(latencyNanos,
TimeUnit.NANOSECONDS);
+ // callback with completed entry
+ entry = new LedgerEntry(request.entryImpl);
} else {
readOpLogger.registerFailedEvent(latencyNanos,
TimeUnit.NANOSECONDS);
+ entry = null;
}
- // callback with completed entry
- listener.onEntryComplete(request.getRc(), lh, request, ctx);
+ request.close();
+ listener.onEntryComplete(request.getRc(), lh, entry, ctx);
}
// if all entries are already completed.
cancelSpeculativeTask(true);
}
- @Override
- public boolean hasMoreElements() {
- return false;
- }
-
- @Override
- public LedgerEntry nextElement() throws NoSuchElementException {
- throw new NoSuchElementException();
- }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 40da31c..e31c5b7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -20,26 +20,24 @@
*/
package org.apache.bookkeeper.client;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
import io.netty.buffer.ByteBuf;
-
import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Enumeration;
import java.util.HashSet;
-import java.util.NoSuchElementException;
-import java.util.Queue;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.net.BookieSocketAddress;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
@@ -55,16 +53,15 @@ import org.slf4j.LoggerFactory;
* application as soon as it arrives rather than waiting for the whole thing.
*
*/
-class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
+class PendingReadOp implements ReadEntryCallback, SafeRunnable {
private static final Logger LOG =
LoggerFactory.getLogger(PendingReadOp.class);
- final private ScheduledExecutorService scheduler;
+ private final ScheduledExecutorService scheduler;
private ScheduledFuture<?> speculativeTask = null;
- Queue<LedgerEntryRequest> seq;
+ protected final List<LedgerEntryRequest> seq;
+ private final CompletableFuture<Iterable<LedgerEntry>> future;
Set<BookieSocketAddress> heardFromHosts;
BitSet heardFromHostsBitSet;
- ReadCallback cb;
- Object ctx;
LedgerHandle lh;
long numPendingEntries;
long startEntryId;
@@ -77,7 +74,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
boolean parallelRead = false;
final AtomicBoolean complete = new AtomicBoolean(false);
- abstract class LedgerEntryRequest extends LedgerEntry implements
SpeculativeRequestExecutor {
+ abstract class LedgerEntryRequest implements SpeculativeRequestExecutor,
AutoCloseable {
final AtomicBoolean complete = new AtomicBoolean(false);
@@ -87,10 +84,10 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
final ArrayList<BookieSocketAddress> ensemble;
final DistributionSchedule.WriteSet writeSet;
+ final LedgerEntryImpl entryImpl;
LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId,
long eId) {
- super(lId, eId);
-
+ this.entryImpl = LedgerEntryImpl.create(lId, eId);
this.ensemble = ensemble;
if (lh.bk.isReorderReadSequence()) {
@@ -98,12 +95,16 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
.reorderReadSequence(
ensemble,
lh.bookieFailureHistory.asMap(),
- lh.distributionSchedule.getWriteSet(entryId));
+ lh.distributionSchedule.getWriteSet(eId));
} else {
- writeSet = lh.distributionSchedule.getWriteSet(entryId);
+ writeSet = lh.distributionSchedule.getWriteSet(eId);
}
}
+ public void close() {
+ entryImpl.close();
+ }
+
/**
* Execute the read request.
*/
@@ -124,7 +125,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
boolean complete(int bookieIndex, BookieSocketAddress host, final
ByteBuf buffer) {
ByteBuf content;
try {
- content = lh.macManager.verifyDigestAndReturnData(entryId,
buffer);
+ content =
lh.macManager.verifyDigestAndReturnData(entryImpl.getEntryId(), buffer);
} catch (BKDigestMatchException e) {
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch",
BKException.Code.DigestMatchException);
buffer.release();
@@ -137,8 +138,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
* The length is a long and it is the last field of the
metadata of an entry.
* Consequently, we have to subtract 8 from METADATA_LENGTH to
get the length.
*/
- length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
- data = content;
+
entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
+ entryImpl.setEntryBuf(content);
writeSet.recycle();
return true;
} else {
@@ -195,12 +196,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
++numMissedEntryReads;
if (LOG.isDebugEnabled()) {
LOG.debug("No such entry found on bookie. L{} E{} bookie:
{}",
- new Object[] { lh.ledgerId, entryId, host });
+ new Object[] { lh.ledgerId, entryImpl.getEntryId(),
host });
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(errMsg + " while reading L{} E{} from bookie:
{}",
- new Object[]{lh.ledgerId, entryId, host});
+ new Object[]{lh.ledgerId, entryImpl.getEntryId(),
host});
}
}
}
@@ -235,7 +236,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
@Override
public String toString() {
- return String.format("L%d-E%d", ledgerId, entryId);
+ return String.format("L%d-E%d", entryImpl.getLedgerId(),
entryImpl.getEntryId());
}
/**
@@ -423,16 +424,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
PendingReadOp(LedgerHandle lh,
ScheduledExecutorService scheduler,
long startEntryId,
- long endEntryId,
- ReadCallback cb,
- Object ctx) {
+ long endEntryId) {
this(
lh,
scheduler,
startEntryId,
endEntryId,
- cb,
- ctx,
false);
}
@@ -440,12 +437,9 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
ScheduledExecutorService scheduler,
long startEntryId,
long endEntryId,
- ReadCallback cb,
- Object ctx,
boolean isRecoveryRead) {
- seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId +
1) - startEntryId));
- this.cb = cb;
- this.ctx = ctx;
+ this.seq = new ArrayList<>((int) ((endEntryId + 1) - startEntryId));
+ this.future = new CompletableFuture<>();
this.lh = lh;
this.startEntryId = startEntryId;
this.endEntryId = endEntryId;
@@ -460,6 +454,10 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
readOpLogger = lh.bk.getReadOpLogger();
}
+ CompletableFuture<Iterable<LedgerEntry>> future() {
+ return future;
+ }
+
protected LedgerMetadata getLedgerMetadata() {
return lh.metadata;
}
@@ -476,7 +474,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
return this;
}
- public void initiate() {
+ public void submit() {
+ lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, this);
+ }
+
+ void initiate() {
long nextEnsembleChange = startEntryId, i = startEntryId;
this.requestTimeNanos = MathUtils.nowInNano();
ArrayList<BookieSocketAddress> ensemble = null;
@@ -503,6 +505,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
}
}
+ @Override
+ public void safeRun() {
+ initiate();
+ }
+
private static class ReadContext implements ReadEntryCallbackCtx {
final int bookieIndex;
final BookieSocketAddress to;
@@ -531,7 +538,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
lh.throttler.acquire();
}
- lh.bk.getBookieClient().readEntry(to, lh.ledgerId, entry.entryId,
+ lh.bk.getBookieClient().readEntry(to, lh.ledgerId,
entry.entryImpl.getEntryId(),
this, new ReadContext(bookieIndex, to,
entry));
}
@@ -574,37 +581,27 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
return;
}
+ cancelSpeculativeTask(true);
+
long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
if (code != BKException.Code.OK) {
long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
for (LedgerEntryRequest req : seq) {
if (!req.isComplete()) {
- firstUnread = req.getEntryId();
+ firstUnread = req.entryImpl.getEntryId();
break;
}
}
LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {}
: bitset = {}. First unread entry is {}",
new Object[] { lh.getId(), startEntryId, endEntryId,
heardFromHosts, heardFromHostsBitSet, firstUnread });
readOpLogger.registerFailedEvent(latencyNanos,
TimeUnit.NANOSECONDS);
+ // release the entries
+ seq.forEach(LedgerEntryRequest::close);
+ future.completeExceptionally(BKException.create(code));
} else {
readOpLogger.registerSuccessfulEvent(latencyNanos,
TimeUnit.NANOSECONDS);
+ future.complete(Lists.transform(seq, input -> input.entryImpl));
}
- cancelSpeculativeTask(true);
- cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
- cb = null;
}
- @Override
- public boolean hasMoreElements() {
- return !seq.isEmpty();
- }
-
- @Override
- public LedgerEntry nextElement() throws NoSuchElementException {
- return seq.remove();
- }
-
- public int size() {
- return seq.size();
- }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index ae721d5..9ee75d9 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -24,17 +24,16 @@ import com.google.common.util.concurrent.ListenableFuture;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.BitSet;
-import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.ReadLastConfirmedAndEntryContext;
import org.apache.bookkeeper.util.MathUtils;
-import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +64,7 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
private long lastAddConfirmed;
private long timeOutInMillis;
- abstract class ReadLACAndEntryRequest extends LedgerEntry {
+ abstract class ReadLACAndEntryRequest implements AutoCloseable {
final AtomicBoolean complete = new AtomicBoolean(false);
@@ -76,12 +75,12 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
final ArrayList<BookieSocketAddress> ensemble;
final DistributionSchedule.WriteSet writeSet;
final DistributionSchedule.WriteSet orderedEnsemble;
+ final LedgerEntryImpl entryImpl;
ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long
lId, long eId) {
- super(lId, eId);
-
+ this.entryImpl = LedgerEntryImpl.create(lId, eId);
this.ensemble = ensemble;
- this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+ this.writeSet = lh.distributionSchedule.getWriteSet(eId);
if (lh.bk.reorderReadSequence) {
this.orderedEnsemble =
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
lh.bookieFailureHistory.asMap(), writeSet.copy());
@@ -90,6 +89,10 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
}
}
+ public void close() {
+ entryImpl.close();
+ }
+
synchronized int getFirstError() {
return firstError;
}
@@ -125,13 +128,12 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
writeSet.recycle();
orderedEnsemble.recycle();
rc = BKException.Code.OK;
- this.entryId = entryId;
/*
* The length is a long and it is the last field of the
metadata of an entry.
* Consequently, we have to subtract 8 from METADATA_LENGTH to
get the length.
*/
- length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
- data = content;
+
entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
+ entryImpl.setEntryBuf(content);
return true;
} else {
return false;
@@ -193,13 +195,13 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
// treat these errors as failures if the node from which we
received this is part of
// the writeSet
if (this.writeSet.contains(bookieIndex)) {
- lh.registerOperationFailureOnBookie(host, entryId);
+ lh.registerOperationFailureOnBookie(host,
entryImpl.getEntryId());
}
++numMissedEntryReads;
}
if (LOG.isDebugEnabled()) {
- LOG.debug(errMsg + " while reading entry: " + entryId + "
ledgerId: " + lh.ledgerId + " from bookie: "
+ LOG.debug(errMsg + " while reading entry: " +
entryImpl.getEntryId() + " ledgerId: " + lh.ledgerId + " from bookie: "
+ host);
}
}
@@ -234,7 +236,7 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
@Override
public String toString() {
- return String.format("L%d-E%d", ledgerId, entryId);
+ return String.format("L%d-E%d", entryImpl.getLedgerId(),
entryImpl.getEntryId());
}
}
@@ -493,15 +495,24 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
public void readLastConfirmedAndEntryComplete(int rc, long
lastAddConfirmed, LedgerEntry entry);
}
- private void submitCallback(int rc, long lastAddConfirmed, LedgerEntry
entry) {
+ private void submitCallback(int rc) {
long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
+ LedgerEntry entry;
if (BKException.Code.OK != rc) {
lh.bk.getReadLacAndEntryOpLogger()
.registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
+ entry = null;
} else {
+ // could received advanced lac, with no entry
lh.bk.getReadLacAndEntryOpLogger()
.registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
+ if (request.entryImpl.getEntryBuffer() != null) {
+ entry = new LedgerEntry(request.entryImpl);
+ } else {
+ entry = null;
+ }
}
+ request.close();
cb.readLastConfirmedAndEntryComplete(rc, lastAddConfirmed, entry);
}
@@ -537,7 +548,7 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
.registerSuccessfulEvent(elapsedMicros,
TimeUnit.MICROSECONDS);
}
- submitCallback(BKException.Code.OK, lastAddConfirmed,
request);
+ submitCallback(BKException.Code.OK);
requestComplete.set(true);
heardFromHostsBitSet.set(rCtx.getBookieIndex(), true);
}
@@ -564,7 +575,7 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
return;
}
} else if (BKException.Code.UnauthorizedAccessException == rc &&
!requestComplete.get()) {
- submitCallback(rc, lastAddConfirmed, null);
+ submitCallback(rc);
requestComplete.set(true);
} else {
request.logErrorAndReattemptRead(rCtx.getBookieIndex(), bookie,
"Error: " + BKException.getMessage(rc), rc);
@@ -580,10 +591,10 @@ class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEnt
if (requestComplete.compareAndSet(false, true)) {
if (!hasValidResponse) {
// no success called
- submitCallback(request.getFirstError(), lastAddConfirmed,
null);
+ submitCallback(request.getFirstError());
} else {
// callback
- submitCallback(BKException.Code.OK, lastAddConfirmed, null);
+ submitCallback(BKException.Code.OK);
}
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
index 13bcff8..5ac3080 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SyncCallbackUtils.java
@@ -17,13 +17,11 @@
*/
package org.apache.bookkeeper.client;
-import com.google.common.collect.Iterators;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
-import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LastConfirmedAndEntryImpl;
/**
@@ -220,34 +218,6 @@ class SyncCallbackUtils {
}
}
- static class FutureReadResult
- extends
CompletableFuture<Iterable<org.apache.bookkeeper.client.api.LedgerEntry>>
- implements AsyncCallback.ReadCallback {
-
- /**
- * Implementation of callback interface for read method of {@link
ReadHandle}.
- *
- * @param rc
- * return code
- * @param lh
- * ledger handle
- * @param seq
- * sequence of entries
- * @param ctx
- * control object
- */
- @Override
- @SuppressWarnings("unchecked")
- public void readComplete(int rc, LedgerHandle lh,
- Enumeration<LedgerEntry> seq, Object ctx) {
- if (rc != BKException.Code.OK) {
-
this.completeExceptionally(BKException.create(rc).fillInStackTrace());
- } else {
- this.complete((Iterable) () -> Iterators.forEnumeration(seq));
- }
- }
- }
-
static class SyncAddCallback extends CompletableFuture<Long> implements
AsyncCallback.AddCallback {
/**
@@ -320,7 +290,7 @@ class SyncCallbackUtils {
@Override
public void readLastConfirmedAndEntryComplete(int rc, long
lastConfirmed, LedgerEntry entry, Object ctx) {
- LastConfirmedAndEntry result = new
LastConfirmedAndEntryImpl(lastConfirmed, entry);
+ LastConfirmedAndEntry result =
LastConfirmedAndEntryImpl.create(lastConfirmed, entry);
finish(rc, result, this);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
index 3a10d96..8bbe58e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LastConfirmedAndEntry.java
@@ -24,14 +24,14 @@ package org.apache.bookkeeper.client.api;
* This contains LastAddConfirmed entryId and a LedgerEntry wanted to read.
* It is used for readLastAddConfirmedAndEntry.
*/
-public interface LastConfirmedAndEntry {
+public interface LastConfirmedAndEntry extends AutoCloseable {
/**
* Gets LastAddConfirmed entryId.
*
* @return the LastAddConfirmed
*/
- Long getLastAddConfirmed();
+ long getLastAddConfirmed();
/**
* Whether this entity contains an entry.
@@ -47,4 +47,9 @@ public interface LastConfirmedAndEntry {
*/
LedgerEntry getEntry();
+ /**
+ * {@inheritDoc}
+ */
+ void close();
+
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
index 97fe377..43bdc34 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerEntry.java
@@ -23,7 +23,6 @@ package org.apache.bookkeeper.client.api;
import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
-import org.apache.bookkeeper.conf.ClientConfiguration;
/**
* An entry.
@@ -32,7 +31,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
*/
@Public
@Unstable
-public interface LedgerEntry {
+public interface LedgerEntry extends AutoCloseable {
/**
* The id of the ledger which contains the entry.
@@ -56,25 +55,31 @@ public interface LedgerEntry {
long getLength();
/**
- * Returns the content of the entry. This method can be called only once.
While using v2 wire protocol this method
- * will automatically release the internal ByteBuf.
+ * Returns the content of the entry.
*
* @return the content of the entry
- * @throws IllegalStateException if this method is called twice
*/
byte[] getEntry();
/**
* Return the internal buffer that contains the entry payload.
*
- * <p>Note: Using v2 wire protocol it is responsibility of the caller
- * to ensure to release the buffer after usage.
- *
* @return a ByteBuf which contains the data
- *
- * @see ClientConfiguration#setNettyUsePooledBuffers(boolean)
- * @throws IllegalStateException if the entry has been retrieved by {@link
#getEntry()}
*/
ByteBuf getEntryBuffer();
+ /**
+ * Returns a duplicate of this entry.
+ *
+ * <p>This call will retain a slice of the underneath byte buffer.
+ *
+ * @return a duplicated ledger entry.
+ */
+ LedgerEntry duplicate();
+
+ /**
+ * {@inheritDoc}
+ */
+ void close();
+
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
index 4ed78f9..8f1924a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LastConfirmedAndEntryImpl.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.client.impl;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -29,19 +31,37 @@ import org.apache.bookkeeper.client.api.LedgerEntry;
*/
public class LastConfirmedAndEntryImpl implements LastConfirmedAndEntry {
- private final Long lac;
- private final LedgerEntry entry;
+ private static final Recycler<LastConfirmedAndEntryImpl> RECYCLER = new
Recycler<LastConfirmedAndEntryImpl>() {
+ @Override
+ protected LastConfirmedAndEntryImpl
newObject(Handle<LastConfirmedAndEntryImpl> handle) {
+ return new LastConfirmedAndEntryImpl(handle);
+ }
+ };
- public LastConfirmedAndEntryImpl(Long lac, LedgerEntry entry) {
- this.lac = lac;
- this.entry = entry;
+ public static LastConfirmedAndEntryImpl create(long lac,
org.apache.bookkeeper.client.LedgerEntry entry) {
+ LastConfirmedAndEntryImpl entryImpl = RECYCLER.get();
+ entryImpl.lac = lac;
+ entryImpl.entry = LedgerEntryImpl.create(
+ entry.getLedgerId(),
+ entry.getEntryId(),
+ entry.getLength(),
+ entry.getEntryBuffer());
+ return entryImpl;
+ }
+
+ private final Handle<LastConfirmedAndEntryImpl> recycleHandle;
+ private Long lac;
+ private LedgerEntry entry;
+
+ public LastConfirmedAndEntryImpl(Handle<LastConfirmedAndEntryImpl> handle)
{
+ this.recycleHandle = handle;
}
/**
* {@inheritDoc}
*/
@Override
- public Long getLastAddConfirmed() {
+ public long getLastAddConfirmed() {
return lac;
}
@@ -60,4 +80,17 @@ public class LastConfirmedAndEntryImpl implements
LastConfirmedAndEntry {
public LedgerEntry getEntry() {
return entry;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ this.lac = -1L;
+ if (null != entry) {
+ entry.close();
+ entry = null;
+ }
+ recycleHandle.recycle(this);
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
new file mode 100644
index 0000000..b90f299
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/LedgerEntryImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+
+/**
+ * Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and
+ * the entry content.
+ */
+public class LedgerEntryImpl implements LedgerEntry {
+
+ private static final Recycler<LedgerEntryImpl> RECYCLER = new
Recycler<LedgerEntryImpl>() {
+ @Override
+ protected LedgerEntryImpl newObject(Handle<LedgerEntryImpl> handle) {
+ return new LedgerEntryImpl(handle);
+ }
+ };
+
+ public static LedgerEntryImpl create(long ledgerId,
+ long entryId) {
+ LedgerEntryImpl entry = RECYCLER.get();
+ entry.ledgerId = ledgerId;
+ entry.entryId = entryId;
+ return entry;
+ }
+
+ public static LedgerEntryImpl create(long ledgerId,
+ long entryId,
+ long length,
+ ByteBuf buf) {
+ LedgerEntryImpl entry = RECYCLER.get();
+ entry.ledgerId = ledgerId;
+ entry.entryId = entryId;
+ entry.length = length;
+ entry.entryBuf = buf;
+ return entry;
+ }
+
+ public static LedgerEntryImpl duplicate(LedgerEntry entry) {
+ return create(
+ entry.getLedgerId(),
+ entry.getEntryId(),
+ entry.getLength(),
+ entry.getEntryBuffer().retainedSlice());
+ }
+
+ private final Handle<LedgerEntryImpl> recycleHandle;
+ private long ledgerId;
+ private long entryId;
+ private long length;
+ private ByteBuf entryBuf;
+
+ private LedgerEntryImpl(Handle<LedgerEntryImpl> handle) {
+ this.recycleHandle = handle;
+ }
+
+ public void setEntryId(long entryId) {
+ this.entryId = entryId;
+ }
+
+ public void setLength(long length) {
+ this.length = length;
+ }
+
+ public void setEntryBuf(ByteBuf buf) {
+ this.entryBuf = buf;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long getEntryId() {
+ return entryId;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getEntry() {
+ return ByteBufUtil.getBytes(entryBuf);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ByteBuf getEntryBuffer() {
+ return entryBuf;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public LedgerEntryImpl duplicate() {
+ return duplicate(this);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ recycle();
+ }
+
+ private void recycle() {
+ this.ledgerId = -1L;
+ this.entryId = -1L;
+ this.length = -1L;
+ ReferenceCountUtil.release(entryBuf);
+ this.entryBuf = null;
+ recycleHandle.recycle(this);
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 91ba830..8fb5c02 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -53,7 +53,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.junit.After;
import org.junit.Before;
-import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -336,13 +335,13 @@ public abstract class MockBookKeeperTestCase {
fencedLedgers.add(ledgerId);
MockEntry mockEntry = getMockLedgerEntry(ledgerId,
bookieSocketAddress, entryId);
if (mockEntry != null) {
- LOG.info("readEntryAndFenceLedger - found mock entry {}@{}
at {}", ledgerId, entryId, bookieSocketAddress);
+ LOG.info("readEntryAndFenceLedger - found mock entry {}@{}
at {}", entryId, ledgerId, bookieSocketAddress);
ByteBuf entry =
macManager.computeDigestAndPackageForSending(entryId,
mockEntry.lastAddConfirmed,
mockEntry.payload.length,
Unpooled.wrappedBuffer(mockEntry.payload));
callback.readEntryComplete(BKException.Code.OK, ledgerId,
entryId, Unpooled.copiedBuffer(entry), args[5]);
entry.release();
} else {
- LOG.info("readEntryAndFenceLedger - no such mock entry
{}@{} at {}", ledgerId, entryId, bookieSocketAddress);
+ LOG.info("readEntryAndFenceLedger - no such mock entry
{}@{} at {}", entryId, ledgerId, bookieSocketAddress);
callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId,
entryId, null, args[5]);
}
});
@@ -361,13 +360,13 @@ public abstract class MockBookKeeperTestCase {
DigestManager macManager = new CRC32DigestManager(ledgerId);
MockEntry mockEntry = getMockLedgerEntry(ledgerId,
bookieSocketAddress, entryId);
if (mockEntry != null) {
- LOG.info("readEntry - found mock entry {}@{} at {}",
ledgerId, entryId, bookieSocketAddress);
+ LOG.info("readEntry - found mock entry {}@{} at {}",
entryId, ledgerId, bookieSocketAddress);
ByteBuf entry =
macManager.computeDigestAndPackageForSending(entryId,
mockEntry.lastAddConfirmed, mockEntry.payload.length,
Unpooled.wrappedBuffer(mockEntry.payload));
callback.readEntryComplete(BKException.Code.OK, ledgerId,
entryId, Unpooled.copiedBuffer(entry), args[4]);
entry.release();
} else {
- LOG.info("readEntry - no such mock entry {}@{} at {}",
ledgerId, entryId, bookieSocketAddress);
+ LOG.info("readEntry - no such mock entry {}@{} at {}",
entryId, ledgerId, bookieSocketAddress);
callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId,
entryId, null, args[4]);
}
});
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
index 17ba6f1..eff8c3a 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java
@@ -20,13 +20,20 @@
*/
package org.apache.bookkeeper.client;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -34,10 +41,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.concurrent.CountDownLatch;
-
/**
* Unit tests for parallel reading
*/
@@ -63,39 +66,6 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
return lh.getId();
}
- static class LatchCallback implements ReadCallback {
-
- final CountDownLatch l = new CountDownLatch(1);
- int rc = -0x1314;
- Enumeration<LedgerEntry> entries;
-
- Enumeration<LedgerEntry> getEntries() {
- return entries;
- }
-
- int getRc() {
- return rc;
- }
-
- @Override
- public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
- this.rc = rc;
- this.entries = seq;
- l.countDown();
- }
-
- void expectSuccess() throws Exception {
- l.await();
- assertTrue(BKException.Code.OK == rc);
- }
-
- void expectFail() throws Exception {
- l.await();
- assertFalse(BKException.Code.OK == rc);
- }
-
- }
-
@Test
public void testNormalParallelRead() throws Exception {
int numEntries = 10;
@@ -105,34 +75,34 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
// read single entry
for (int i = 0; i < numEntries; i++) {
- LatchCallback latch = new LatchCallback();
PendingReadOp readOp =
- new PendingReadOp(lh, lh.bk.scheduler, i, i, latch, null);
- readOp.parallelRead(true).initiate();
- latch.expectSuccess();
- Enumeration<LedgerEntry> entries = latch.getEntries();
- assertNotNull(entries);
- assertTrue(entries.hasMoreElements());
- LedgerEntry entry = entries.nextElement();
+ new PendingReadOp(lh, lh.bk.scheduler, i, i);
+ readOp.parallelRead(true).submit();
+ Iterable<LedgerEntry> iterable = readOp.future().get();
+ assertNotNull(iterable);
+ Iterator<LedgerEntry> entries = iterable.iterator();
+ assertTrue(entries.hasNext());
+ LedgerEntry entry = entries.next();
assertNotNull(entry);
assertEquals(i, Integer.parseInt(new String(entry.getEntry())));
- assertFalse(entries.hasMoreElements());
+ entry.close();
+ assertFalse(entries.hasNext());
}
// read multiple entries
- LatchCallback latch = new LatchCallback();
PendingReadOp readOp =
- new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1,
latch, null);
- readOp.parallelRead(true).initiate();
- latch.expectSuccess();
- Enumeration<LedgerEntry> entries = latch.getEntries();
- assertNotNull(entries);
+ new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+ readOp.parallelRead(true).submit();
+ Iterable<LedgerEntry> iterable = readOp.future().get();
+ assertNotNull(iterable);
+ Iterator<LedgerEntry> iterator = iterable.iterator();
int numReads = 0;
- while (entries.hasMoreElements()) {
- LedgerEntry entry = entries.nextElement();
+ while (iterator.hasNext()) {
+ LedgerEntry entry = iterator.next();
assertNotNull(entry);
assertEquals(numReads, Integer.parseInt(new
String(entry.getEntry())));
+ entry.close();
++numReads;
}
assertEquals(numEntries, numReads);
@@ -140,6 +110,17 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
lh.close();
}
+ private static <T> void expectFail(CompletableFuture<T> future, int
expectedRc) {
+ try {
+ result(future);
+ fail("Expect to fail");
+ } catch (Exception e) {
+ assertTrue(e instanceof BKException);
+ BKException bke = (BKException) e;
+ assertEquals(expectedRc, bke.getCode());
+ }
+ }
+
@Test
public void testParallelReadMissingEntries() throws Exception {
int numEntries = 10;
@@ -148,19 +129,15 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
// read single entry
- LatchCallback latch = new LatchCallback();
PendingReadOp readOp =
- new PendingReadOp(lh, lh.bk.scheduler, 11, 11, latch, null);
- readOp.parallelRead(true).initiate();
- latch.expectFail();
- assertEquals(BKException.Code.NoSuchEntryException, latch.getRc());
+ new PendingReadOp(lh, lh.bk.scheduler, 11, 11);
+ readOp.parallelRead(true).submit();
+ expectFail(readOp.future(), Code.NoSuchEntryException);
// read multiple entries
- latch = new LatchCallback();
- readOp = new PendingReadOp(lh, lh.bk.scheduler, 8, 11, latch, null);
- readOp.parallelRead(true).initiate();
- latch.expectFail();
- assertEquals(BKException.Code.NoSuchEntryException, latch.getRc());
+ readOp = new PendingReadOp(lh, lh.bk.scheduler, 8, 11);
+ readOp.parallelRead(true).submit();
+ expectFail(readOp.future(), Code.NoSuchEntryException);
lh.close();
}
@@ -186,13 +163,11 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
sleepBookie(ensemble.get(0), latch1);
sleepBookie(ensemble.get(1), latch2);
- LatchCallback latchCallback = new LatchCallback();
PendingReadOp readOp =
- new PendingReadOp(lh, lh.bk.scheduler, 10, 10, latchCallback,
null);
- readOp.parallelRead(true).initiate();
+ new PendingReadOp(lh, lh.bk.scheduler, 10, 10);
+ readOp.parallelRead(true).submit();
// would fail immediately if found missing entries don't cover ack
quorum
- latchCallback.expectFail();
- assertEquals(BKException.Code.NoSuchEntryException,
latchCallback.getRc());
+ expectFail(readOp.future(), Code.NoSuchEntryException);
latch1.countDown();
latch2.countDown();
@@ -220,17 +195,16 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
killBookie(ensemble.get(1));
// read multiple entries
- LatchCallback latch = new LatchCallback();
PendingReadOp readOp =
- new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1,
latch, null);
- readOp.parallelRead(true).initiate();
- latch.expectSuccess();
- Enumeration<LedgerEntry> entries = latch.getEntries();
- assertNotNull(entries);
+ new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+ readOp.parallelRead(true).submit();
+ Iterable<LedgerEntry> iterable = readOp.future().get();
+ assertNotNull(iterable);
+ Iterator<LedgerEntry> entries = iterable.iterator();
int numReads = 0;
- while (entries.hasMoreElements()) {
- LedgerEntry entry = entries.nextElement();
+ while (entries.hasNext()) {
+ LedgerEntry entry = entries.next();
assertNotNull(entry);
assertEquals(numReads, Integer.parseInt(new
String(entry.getEntry())));
++numReads;
@@ -262,12 +236,10 @@ public class TestParallelRead extends
BookKeeperClusterTestCase {
killBookie(ensemble.get(2));
// read multiple entries
- LatchCallback latch = new LatchCallback();
PendingReadOp readOp =
- new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1,
latch, null);
- readOp.parallelRead(true).initiate();
- latch.expectFail();
- assertEquals(BKException.Code.BookieHandleNotAvailableException,
latch.getRc());
+ new PendingReadOp(lh, lh.bk.scheduler, 0, numEntries - 1);
+ readOp.parallelRead(true).submit();
+ expectFail(readOp.future(), Code.BookieHandleNotAvailableException);
lh.close();
newBk.close();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
index 11e46e9..fb9a747 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadEntryListener.java
@@ -24,6 +24,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.net.BookieSocketAddress;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
@@ -32,11 +36,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
/**
* Unit tests for {@link
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener}.
*/
@@ -87,11 +86,17 @@ public class TestReadEntryListener extends
BookKeeperClusterTestCase {
@Override
public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry
entry, Object ctx) {
- if (nextEntryId != entry.getEntryId()) {
- inOrder = false;
+ long entryId;
+ if (BKException.Code.OK == rc) {
+ if (nextEntryId != entry.getEntryId()) {
+ inOrder = false;
+ }
+ entryId = entry.getEntryId();
+ } else {
+ entryId = nextEntryId;
}
+ resultCodes.put(entryId, new EntryWithRC(rc, entry));
++nextEntryId;
- resultCodes.put(entry.getEntryId(), new EntryWithRC(rc, entry));
l.countDown();
}
@@ -115,7 +120,7 @@ public class TestReadEntryListener extends
BookKeeperClusterTestCase {
LatchListener listener = new LatchListener(i, 1);
ListenerBasedPendingReadOp readOp =
new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, i, i,
listener, null);
- readOp.parallelRead(parallelRead).initiate();
+ readOp.parallelRead(parallelRead).submit();
listener.expectComplete();
assertEquals(1, listener.resultCodes.size());
EntryWithRC entry = listener.resultCodes.get((long) i);
@@ -129,7 +134,7 @@ public class TestReadEntryListener extends
BookKeeperClusterTestCase {
LatchListener listener = new LatchListener(0L, numEntries);
ListenerBasedPendingReadOp readOp =
new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0,
numEntries - 1, listener, null);
- readOp.parallelRead(parallelRead).initiate();
+ readOp.parallelRead(parallelRead).submit();
listener.expectComplete();
assertEquals(numEntries, listener.resultCodes.size());
for (int i = 0; i < numEntries; i++) {
@@ -163,7 +168,7 @@ public class TestReadEntryListener extends
BookKeeperClusterTestCase {
LatchListener listener = new LatchListener(11L, 1);
ListenerBasedPendingReadOp readOp =
new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 11, 11,
listener, null);
- readOp.parallelRead(parallelRead).initiate();
+ readOp.parallelRead(parallelRead).submit();
listener.expectComplete();
assertEquals(1, listener.resultCodes.size());
EntryWithRC entry = listener.resultCodes.get(11L);
@@ -174,7 +179,7 @@ public class TestReadEntryListener extends
BookKeeperClusterTestCase {
// read multiple missing entries
listener = new LatchListener(11L, 3);
readOp = new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 11, 13,
listener, null);
- readOp.parallelRead(parallelRead).initiate();
+ readOp.parallelRead(parallelRead).submit();
listener.expectComplete();
assertEquals(3, listener.resultCodes.size());
assertTrue(listener.isInOrder());
@@ -188,7 +193,7 @@ public class TestReadEntryListener extends
BookKeeperClusterTestCase {
// read multiple entries with missing entries
listener = new LatchListener(5L, 10);
readOp = new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 5L, 14L,
listener, null);
- readOp.parallelRead(parallelRead).initiate();
+ readOp.parallelRead(parallelRead).submit();
listener.expectComplete();
assertEquals(10, listener.resultCodes.size());
assertTrue(listener.isInOrder());
@@ -234,7 +239,7 @@ public class TestReadEntryListener extends
BookKeeperClusterTestCase {
LatchListener listener = new LatchListener(0L, numEntries);
ListenerBasedPendingReadOp readOp =
new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0,
numEntries - 1, listener, null);
- readOp.parallelRead(parallelRead).initiate();
+ readOp.parallelRead(parallelRead).submit();
listener.expectComplete();
assertEquals(numEntries, listener.resultCodes.size());
for (int i = 0; i < numEntries; i++) {
@@ -265,7 +270,7 @@ public class TestReadEntryListener extends
BookKeeperClusterTestCase {
LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
ArrayList<BookieSocketAddress> ensemble =
- lh.getLedgerMetadata().getEnsemble(5);
+ lh.getLedgerMetadata().getEnsemble(5);
// kill bookies
killBookie(ensemble.get(0));
killBookie(ensemble.get(1));
@@ -274,8 +279,8 @@ public class TestReadEntryListener extends
BookKeeperClusterTestCase {
// read multiple entries
LatchListener listener = new LatchListener(0L, numEntries);
ListenerBasedPendingReadOp readOp =
- new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0,
numEntries - 1, listener, null);
- readOp.parallelRead(parallelRead).initiate();
+ new ListenerBasedPendingReadOp(lh, lh.bk.scheduler, 0, numEntries
- 1, listener, null);
+ readOp.parallelRead(parallelRead).submit();
listener.expectComplete();
assertEquals(numEntries, listener.resultCodes.size());
for (int i = 0; i < numEntries; i++) {
@@ -294,11 +299,11 @@ public class TestReadEntryListener extends
BookKeeperClusterTestCase {
@Test
public void testReadFailureWithFailedBookiesEnableParallelRead() throws
Exception {
- readWithFailedBookiesTest(true);
+ readFailureWithFailedBookiesTest(true);
}
@Test
public void testReadFailureWithFailedBookiesDisableParallelRead() throws
Exception {
- readWithFailedBookiesTest(false);
+ readFailureWithFailedBookiesTest(false);
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index 5ca752a..acfeb8c 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -20,6 +20,10 @@
*/
package org.apache.bookkeeper.client;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Enumeration;
@@ -34,8 +38,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-
/**
* This unit test tests ledger fencing;
*
@@ -292,9 +294,7 @@ public class TestSpeculativeRead extends
BookKeeperClusterTestCase {
secondHostOnly.set(1, true);
PendingReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null;
try {
- LatchCallback latch0 = new LatchCallback();
- PendingReadOp op = new PendingReadOp(l, bkspec.scheduler,
- 0, 5, latch0, null);
+ PendingReadOp op = new PendingReadOp(l, bkspec.scheduler, 0, 5);
// if we've already heard from all hosts,
// we only send the initial read
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
index 8a3d68f..bbaa358 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java
@@ -207,8 +207,9 @@ public class BookKeeperApiTest extends
MockBookKeeperTestCase {
// test readLastAddConfirmedAndEntry
LastConfirmedAndEntry lastConfirmedAndEntry =
result(reader.readLastAddConfirmedAndEntry(0, 999, false));
- assertEquals(2,
lastConfirmedAndEntry.getLastAddConfirmed().intValue());
+ assertEquals(2L, lastConfirmedAndEntry.getLastAddConfirmed());
assertArrayEquals(data,
lastConfirmedAndEntry.getEntry().getEntry());
+ lastConfirmedAndEntry.close();
}
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].