Author: ivank
Date: Wed Nov  7 16:27:20 2012
New Revision: 1406707

URL: http://svn.apache.org/viewvc?rev=1406707&view=rev
Log:
BOOKKEEPER-444: Refactor pending read op to make speculative reads possible 
(ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
    
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1406707&r1=1406706&r2=1406707&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Nov  7 16:27:20 2012
@@ -178,6 +178,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-346: Detect IOExceptions in LedgerCache and bookie should 
look at next ledger dir(if any) (Vinay via ivank)
 
+        BOOKKEEPER-444: Refactor pending read op to make speculative reads 
possible (ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage 
metadata operations in Hedwig (sijie via ivank)

Modified: 
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java?rev=1406707&r1=1406706&r2=1406707&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
 Wed Nov  7 16:27:20 2012
@@ -42,8 +42,6 @@ public class LedgerEntry {
     long length;
     ChannelBufferInputStream entryDataStream;
 
-    int nextReplicaIndexToReadFrom = 0;
-
     LedgerEntry(long lId, long eId) {
         this.ledgerId = lId;
         this.entryId = eId;

Modified: 
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1406707&r1=1406706&r2=1406707&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 Wed Nov  7 16:27:20 2012
@@ -20,25 +20,21 @@ package org.apache.bookkeeper.client;
  * under the License.
  *
  */
-
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.NoSuchElementException;
 import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
-import org.apache.bookkeeper.proto.BookieProtocol;
 import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferInputStream;
 
-import java.io.IOException;
-
 /**
  * Sequence of entries of a ledger that represents a pending read operation.
  * When all the data read has come back, the application callback is called.
@@ -46,27 +42,105 @@ import java.io.IOException;
  * application as soon as it arrives rather than waiting for the whole thing.
  *
  */
-
 class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
     Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
 
-    Queue<LedgerEntry> seq;
+    Queue<LedgerEntryRequest> seq;
     ReadCallback cb;
     Object ctx;
     LedgerHandle lh;
-    long numPendingReads;
+    long numPendingEntries;
     long startEntryId;
     long endEntryId;
 
+    private class LedgerEntryRequest extends LedgerEntry {
+        int nextReplicaIndexToReadFrom = 0;
+        AtomicBoolean complete = new AtomicBoolean(false);
+
+        int firstError = BKException.Code.OK;
+
+        final ArrayList<InetSocketAddress> ensemble;
+
+        LedgerEntryRequest(ArrayList<InetSocketAddress> ensemble, long lId, 
long eId) {
+            super(lId, eId);
+
+            this.ensemble = ensemble;
+        }
+
+        void sendNextRead() {
+            if (nextReplicaIndexToReadFrom >= 
lh.metadata.getWriteQuorumSize()) {
+                // we are done, the read has failed from all replicas, just 
fail the
+                // read
+                submitCallback(firstError);
+                return;
+            }
+
+            int bookieIndex = 
lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom);
+            nextReplicaIndexToReadFrom++;
+
+            try {
+                sendReadTo(ensemble.get(bookieIndex), this);
+            } catch (InterruptedException ie) {
+                LOG.error("Interrupted reading entry " + this, ie);
+                Thread.currentThread().interrupt();
+                submitCallback(BKException.Code.ReadException);
+            }
+        }
+
+        void logErrorAndReattemptRead(String errMsg, int rc) {
+            if (firstError == BKException.Code.OK) {
+                firstError = rc;
+            }
+
+            int bookieIndex = 
lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom - 
1);
+            LOG.error(errMsg + " while reading entry: " + entryId + " 
ledgerId: " + lh.ledgerId + " from bookie: "
+                      + ensemble.get(bookieIndex));
+
+            sendNextRead();
+        }
+
+        // return true if we managed to complete the entry
+        boolean complete(final ChannelBuffer buffer) {
+            ChannelBufferInputStream is;
+            try {
+                is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
+            } catch (BKDigestMatchException e) {
+                logErrorAndReattemptRead("Mac mismatch", 
BKException.Code.DigestMatchException);
+                return false;
+            }
+
+            if (!complete.getAndSet(true)) {
+                entryDataStream = is;
+
+                /*
+                 * The length is a long and it is the last field of the 
metadata of an entry.
+                 * Consequently, we have to subtract 8 from METADATA_LENGTH to 
get the length.
+                 */
+                length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        boolean isComplete() {
+            return complete.get();
+        }
+
+        public String toString() {
+            return String.format("L%d-E%d", ledgerId, entryId);
+        }
+    }
+
     PendingReadOp(LedgerHandle lh, long startEntryId, long endEntryId, 
ReadCallback cb, Object ctx) {
 
-        seq = new ArrayDeque<LedgerEntry>((int) (endEntryId - startEntryId));
+        seq = new ArrayDeque<LedgerEntryRequest>((int) (endEntryId - 
startEntryId));
         this.cb = cb;
         this.ctx = ctx;
         this.lh = lh;
         this.startEntryId = startEntryId;
         this.endEntryId = endEntryId;
-        numPendingReads = endEntryId - startEntryId + 1;
+        numPendingEntries = endEntryId - startEntryId + 1;
     }
 
     public void initiate() throws InterruptedException {
@@ -76,47 +150,30 @@ class PendingReadOp implements Enumerati
         do {
             LOG.debug("Acquiring lock: {}", i);
 
-            lh.opCounterSem.acquire();
-
             if (i == nextEnsembleChange) {
                 ensemble = lh.metadata.getEnsemble(i);
                 nextEnsembleChange = lh.metadata.getNextEnsembleChange(i);
             }
-            LedgerEntry entry = new LedgerEntry(lh.ledgerId, i);
+            LedgerEntryRequest entry = new LedgerEntryRequest(ensemble, 
lh.ledgerId, i);
             seq.add(entry);
             i++;
-            sendRead(ensemble, entry, BKException.Code.ReadException);
 
+            entry.sendNextRead();
         } while (i <= endEntryId);
     }
 
-    void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry, 
int lastErrorCode) {
-        if (entry.nextReplicaIndexToReadFrom >= 
lh.metadata.getWriteQuorumSize()) {
-            // we are done, the read has failed from all replicas, just fail 
the
-            // read
-            lh.opCounterSem.release();
-            submitCallback(lastErrorCode);
-            return;
-        }
+    void sendReadTo(InetSocketAddress to, LedgerEntryRequest entry) throws 
InterruptedException {
+        lh.opCounterSem.acquire();
 
-        int bookieIndex = 
lh.distributionSchedule.getWriteSet(entry.entryId).get(entry.nextReplicaIndexToReadFrom);
-        entry.nextReplicaIndexToReadFrom++;
-        lh.bk.bookieClient.readEntry(ensemble.get(bookieIndex), lh.ledgerId, 
entry.entryId, 
+        lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId, 
                                      this, entry);
     }
 
-    void logErrorAndReattemptRead(LedgerEntry entry, String errMsg, int rc) {
-        ArrayList<InetSocketAddress> ensemble = 
lh.metadata.getEnsemble(entry.entryId);
-        int bookeIndex = 
lh.distributionSchedule.getWriteSet(entry.entryId).get(entry.nextReplicaIndexToReadFrom
 - 1);
-        LOG.error(errMsg + " while reading entry: " + entry.entryId + " 
ledgerId: " + lh.ledgerId + " from bookie: "
-                  + ensemble.get(bookeIndex));
-        sendRead(ensemble, entry, rc);
-        return;
-    }
-
     @Override
     public void readEntryComplete(int rc, long ledgerId, final long entryId, 
final ChannelBuffer buffer, Object ctx) {
-        final LedgerEntry entry = (LedgerEntry) ctx;
+        final LedgerEntryRequest entry = (LedgerEntryRequest) ctx;
+
+        lh.opCounterSem.release();
 
         // if we just read only one entry, and this entry is not existed (in 
recoveryRead case)
         // we don't need to do ReattemptRead, otherwise we could not handle 
following case:
@@ -127,43 +184,25 @@ class PendingReadOp implements Enumerati
         if (startEntryId == endEntryId) {
             if (BKException.Code.NoSuchLedgerExistsException == rc ||
                 BKException.Code.NoSuchEntryException == rc) {
-                lh.opCounterSem.release();
                 submitCallback(rc);
                 return;
             }
         }
 
         if (rc != BKException.Code.OK) {
-            logErrorAndReattemptRead(entry, "Error: " + 
BKException.getMessage(rc), rc);
+            entry.logErrorAndReattemptRead("Error: " + 
BKException.getMessage(rc), rc);
             return;
         }
 
-        ChannelBufferInputStream is;
-        try {
-            is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
-        } catch (BKDigestMatchException e) {
-            logErrorAndReattemptRead(entry, "Mac mismatch", 
BKException.Code.DigestMatchException);
-            return;
+        if (entry.complete(buffer)) {
+            numPendingEntries--;
         }
 
-        entry.entryDataStream = is;
-
-        /*
-         * The length is a long and it is the last field of the metadata of an 
entry.
-         * Consequently, we have to subtract 8 from METADATA_LENGTH to get the 
length.
-         */
-        entry.length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
-
-        numPendingReads--;
-        if (numPendingReads == 0) {
+        if (numPendingEntries == 0) {
             submitCallback(BKException.Code.OK);
         }
 
-        LOG.debug("Releasing lock: {}", entryId);
-
-        lh.opCounterSem.release();
-
-        if(numPendingReads < 0)
+        if(numPendingEntries < 0)
             LOG.error("Read too many values");
     }
 


Reply via email to