Author: ivank
Date: Wed Nov 7 16:27:20 2012
New Revision: 1406707
URL: http://svn.apache.org/viewvc?rev=1406707&view=rev
Log:
BOOKKEEPER-444: Refactor pending read op to make speculative reads possible
(ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1406707&r1=1406706&r2=1406707&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Nov 7 16:27:20 2012
@@ -178,6 +178,8 @@ Trunk (unreleased changes)
BOOKKEEPER-346: Detect IOExceptions in LedgerCache and bookie should
look at next ledger dir(if any) (Vinay via ivank)
+ BOOKKEEPER-444: Refactor pending read op to make speculative reads
possible (ivank)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage
metadata operations in Hedwig (sijie via ivank)
Modified:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java?rev=1406707&r1=1406706&r2=1406707&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
(original)
+++
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
Wed Nov 7 16:27:20 2012
@@ -42,8 +42,6 @@ public class LedgerEntry {
long length;
ChannelBufferInputStream entryDataStream;
- int nextReplicaIndexToReadFrom = 0;
-
LedgerEntry(long lId, long eId) {
this.ledgerId = lId;
this.entryId = eId;
Modified:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1406707&r1=1406706&r2=1406707&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
(original)
+++
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
Wed Nov 7 16:27:20 2012
@@ -20,25 +20,21 @@ package org.apache.bookkeeper.client;
* under the License.
*
*/
-
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
-import org.apache.bookkeeper.proto.BookieProtocol;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
-import java.io.IOException;
-
/**
* Sequence of entries of a ledger that represents a pending read operation.
* When all the data read has come back, the application callback is called.
@@ -46,27 +42,105 @@ import java.io.IOException;
* application as soon as it arrives rather than waiting for the whole thing.
*
*/
-
class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
- Queue<LedgerEntry> seq;
+ Queue<LedgerEntryRequest> seq;
ReadCallback cb;
Object ctx;
LedgerHandle lh;
- long numPendingReads;
+ long numPendingEntries;
long startEntryId;
long endEntryId;
+ private class LedgerEntryRequest extends LedgerEntry {
+ int nextReplicaIndexToReadFrom = 0;
+ AtomicBoolean complete = new AtomicBoolean(false);
+
+ int firstError = BKException.Code.OK;
+
+ final ArrayList<InetSocketAddress> ensemble;
+
+ LedgerEntryRequest(ArrayList<InetSocketAddress> ensemble, long lId,
long eId) {
+ super(lId, eId);
+
+ this.ensemble = ensemble;
+ }
+
+ void sendNextRead() {
+ if (nextReplicaIndexToReadFrom >=
lh.metadata.getWriteQuorumSize()) {
+ // we are done, the read has failed from all replicas, just
fail the
+ // read
+ submitCallback(firstError);
+ return;
+ }
+
+ int bookieIndex =
lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom);
+ nextReplicaIndexToReadFrom++;
+
+ try {
+ sendReadTo(ensemble.get(bookieIndex), this);
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted reading entry " + this, ie);
+ Thread.currentThread().interrupt();
+ submitCallback(BKException.Code.ReadException);
+ }
+ }
+
+ void logErrorAndReattemptRead(String errMsg, int rc) {
+ if (firstError == BKException.Code.OK) {
+ firstError = rc;
+ }
+
+ int bookieIndex =
lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom -
1);
+ LOG.error(errMsg + " while reading entry: " + entryId + "
ledgerId: " + lh.ledgerId + " from bookie: "
+ + ensemble.get(bookieIndex));
+
+ sendNextRead();
+ }
+
+ // return true if we managed to complete the entry
+ boolean complete(final ChannelBuffer buffer) {
+ ChannelBufferInputStream is;
+ try {
+ is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
+ } catch (BKDigestMatchException e) {
+ logErrorAndReattemptRead("Mac mismatch",
BKException.Code.DigestMatchException);
+ return false;
+ }
+
+ if (!complete.getAndSet(true)) {
+ entryDataStream = is;
+
+ /*
+ * The length is a long and it is the last field of the
metadata of an entry.
+ * Consequently, we have to subtract 8 from METADATA_LENGTH to
get the length.
+ */
+ length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ boolean isComplete() {
+ return complete.get();
+ }
+
+ public String toString() {
+ return String.format("L%d-E%d", ledgerId, entryId);
+ }
+ }
+
PendingReadOp(LedgerHandle lh, long startEntryId, long endEntryId,
ReadCallback cb, Object ctx) {
- seq = new ArrayDeque<LedgerEntry>((int) (endEntryId - startEntryId));
+ seq = new ArrayDeque<LedgerEntryRequest>((int) (endEntryId -
startEntryId));
this.cb = cb;
this.ctx = ctx;
this.lh = lh;
this.startEntryId = startEntryId;
this.endEntryId = endEntryId;
- numPendingReads = endEntryId - startEntryId + 1;
+ numPendingEntries = endEntryId - startEntryId + 1;
}
public void initiate() throws InterruptedException {
@@ -76,47 +150,30 @@ class PendingReadOp implements Enumerati
do {
LOG.debug("Acquiring lock: {}", i);
- lh.opCounterSem.acquire();
-
if (i == nextEnsembleChange) {
ensemble = lh.metadata.getEnsemble(i);
nextEnsembleChange = lh.metadata.getNextEnsembleChange(i);
}
- LedgerEntry entry = new LedgerEntry(lh.ledgerId, i);
+ LedgerEntryRequest entry = new LedgerEntryRequest(ensemble,
lh.ledgerId, i);
seq.add(entry);
i++;
- sendRead(ensemble, entry, BKException.Code.ReadException);
+ entry.sendNextRead();
} while (i <= endEntryId);
}
- void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry,
int lastErrorCode) {
- if (entry.nextReplicaIndexToReadFrom >=
lh.metadata.getWriteQuorumSize()) {
- // we are done, the read has failed from all replicas, just fail
the
- // read
- lh.opCounterSem.release();
- submitCallback(lastErrorCode);
- return;
- }
+ void sendReadTo(InetSocketAddress to, LedgerEntryRequest entry) throws
InterruptedException {
+ lh.opCounterSem.acquire();
- int bookieIndex =
lh.distributionSchedule.getWriteSet(entry.entryId).get(entry.nextReplicaIndexToReadFrom);
- entry.nextReplicaIndexToReadFrom++;
- lh.bk.bookieClient.readEntry(ensemble.get(bookieIndex), lh.ledgerId,
entry.entryId,
+ lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId,
this, entry);
}
- void logErrorAndReattemptRead(LedgerEntry entry, String errMsg, int rc) {
- ArrayList<InetSocketAddress> ensemble =
lh.metadata.getEnsemble(entry.entryId);
- int bookeIndex =
lh.distributionSchedule.getWriteSet(entry.entryId).get(entry.nextReplicaIndexToReadFrom
- 1);
- LOG.error(errMsg + " while reading entry: " + entry.entryId + "
ledgerId: " + lh.ledgerId + " from bookie: "
- + ensemble.get(bookeIndex));
- sendRead(ensemble, entry, rc);
- return;
- }
-
@Override
public void readEntryComplete(int rc, long ledgerId, final long entryId,
final ChannelBuffer buffer, Object ctx) {
- final LedgerEntry entry = (LedgerEntry) ctx;
+ final LedgerEntryRequest entry = (LedgerEntryRequest) ctx;
+
+ lh.opCounterSem.release();
// if we just read only one entry, and this entry is not existed (in
recoveryRead case)
// we don't need to do ReattemptRead, otherwise we could not handle
following case:
@@ -127,43 +184,25 @@ class PendingReadOp implements Enumerati
if (startEntryId == endEntryId) {
if (BKException.Code.NoSuchLedgerExistsException == rc ||
BKException.Code.NoSuchEntryException == rc) {
- lh.opCounterSem.release();
submitCallback(rc);
return;
}
}
if (rc != BKException.Code.OK) {
- logErrorAndReattemptRead(entry, "Error: " +
BKException.getMessage(rc), rc);
+ entry.logErrorAndReattemptRead("Error: " +
BKException.getMessage(rc), rc);
return;
}
- ChannelBufferInputStream is;
- try {
- is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
- } catch (BKDigestMatchException e) {
- logErrorAndReattemptRead(entry, "Mac mismatch",
BKException.Code.DigestMatchException);
- return;
+ if (entry.complete(buffer)) {
+ numPendingEntries--;
}
- entry.entryDataStream = is;
-
- /*
- * The length is a long and it is the last field of the metadata of an
entry.
- * Consequently, we have to subtract 8 from METADATA_LENGTH to get the
length.
- */
- entry.length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
-
- numPendingReads--;
- if (numPendingReads == 0) {
+ if (numPendingEntries == 0) {
submitCallback(BKException.Code.OK);
}
- LOG.debug("Releasing lock: {}", entryId);
-
- lh.opCounterSem.release();
-
- if(numPendingReads < 0)
+ if(numPendingEntries < 0)
LOG.error("Read too many values");
}