Author: ivank
Date: Thu Jul 28 18:33:52 2011
New Revision: 1151958
URL: http://svn.apache.org/viewvc?rev=1151958&view=rev
Log:
BOOKKEEPER-11: Read from open ledger (fpj via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
zookeeper/bookkeeper/trunk/doc/bookkeeperProgrammer.textile
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1151958&r1=1151957&r2=1151958&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Jul 28 18:33:52 2011
@@ -20,3 +20,4 @@ BUGFIXES:
BOOKKEEPER-30: Test are too noisy (ivank via fpj)
+ BOOKKEEPER-11: Read from open ledger (fpj via ivank)
Modified:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java?rev=1151958&r1=1151957&r2=1151958&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
(original)
+++
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
Thu Jul 28 18:33:52 2011
@@ -122,5 +122,18 @@ public interface AsyncCallback {
*/
void recoverComplete(int rc, Object ctx);
}
+
+ public interface ReadLastConfirmedCallback {
+ /**
+ * Callback definition for bookie recover operations
+ *
+ * @param rc
+ * return code
+ * @param ctx
+ * control object
+ */
+ void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx);
+ }
+
}
Modified:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1151958&r1=1151957&r2=1151958&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
(original)
+++
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
Thu Jul 28 18:33:52 2011
@@ -283,11 +283,36 @@ public class BookKeeper implements OpenC
public void asyncOpenLedger(long lId, DigestType digestType, byte passwd[],
OpenCallback cb, Object ctx) {
- new LedgerOpenOp(this, lId, digestType, passwd, cb, ctx).initiate();
+ new LedgerOpenOp(this, lId, digestType, passwd, false, cb,
ctx).initiate();
}
/**
+ * Open existing ledger asynchronously for reading, but it does not try to
+ * recover the ledger if it is not yet closed. The application needs to use
+ * it carefully, since the writer might have crash and ledger will remain
+ * unsealed forever if there is no external mechanism to detect the failure
+ * of the writer and the ledger is not open in a safe manner, invoking the
+ * recovery procedure.
+ *
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @param ctx
+ * optional control object
+ */
+
+ public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte
passwd[],
+ OpenCallback cb, Object ctx) {
+
+ new LedgerOpenOp(this, lId, digestType, passwd, true, cb,
ctx).initiate();
+
+ }
+
+ /**
* Callback method for synchronous open operation
*
* @param rc
@@ -342,6 +367,40 @@ public class BookKeeper implements OpenC
}
/**
+ * Synchronous, unsafe open ledger call
+ *
+ * @param lId
+ * ledger identifier
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @return
+ * @throws InterruptedException
+ * @throws BKException
+ */
+
+ public LedgerHandle openLedgerNoRecovery(long lId, DigestType digestType,
byte passwd[])
+ throws BKException, InterruptedException {
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
+
+ /*
+ * Calls async open ledger
+ */
+ asyncOpenLedgerNoRecovery(lId, digestType, passwd, this, counter);
+
+ /*
+ * Wait
+ */
+ counter.block(0);
+ if (counter.getrc() != BKException.Code.OK)
+ throw BKException.create(counter.getrc());
+
+ return counter.getLh();
+ }
+
+ /**
* Deletes a ledger asynchronously.
*
* @param lId
Modified:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1151958&r1=1151957&r2=1151958&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
(original)
+++
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
Thu Jul 28 18:33:52 2011
@@ -29,6 +29,7 @@ import java.util.Enumeration;
import java.util.Queue;
import java.util.concurrent.Semaphore;
+import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -51,9 +52,10 @@ import org.jboss.netty.buffer.ChannelBuf
* Ledger handle contains ledger metadata and is used to access the read and
* write operations to a ledger.
*/
-public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback {
+public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback,
ReadLastConfirmedCallback {
final static Logger LOG = Logger.getLogger(LedgerHandle.class);
-
+ final static long LAST_ADD_CONFIRMED = -1;
+
final byte[] ledgerKey;
final LedgerMetadata metadata;
final BookKeeper bk;
@@ -381,6 +383,64 @@ public class LedgerHandle implements Rea
}
}
+ /**
+ * Obtains last confirmed write from a quorum of bookies.
+ *
+ * @param cb
+ * @param ctx
+ */
+
+ public void asyncReadLastConfirmed(ReadLastConfirmedCallback cb, Object ctx){
+ new ReadLastConfirmedOp(this, cb, ctx).initiate();
+ }
+
+
+ /**
+ * Context objects for synchronous call to read last confirmed.
+ */
+ class LastConfirmedCtx {
+ long response;
+ int rc;
+
+ LastConfirmedCtx(){
+ this.response = -1;
+ }
+
+ void setLastConfirmed(long lastConfirmed){
+ this.response = lastConfirmed;
+ }
+
+ long getlastConfirmed(){
+ return this.response;
+ }
+
+ void setRC(int rc){
+ this.rc = rc;
+ }
+
+ int getRC(){
+ return this.rc;
+ }
+
+ boolean ready(){
+ return (this.response != -1);
+ }
+ }
+
+ public long readLastConfirmed()
+ throws InterruptedException, BKException {
+ LastConfirmedCtx ctx = new LastConfirmedCtx();
+ asyncReadLastConfirmed(this, ctx);
+ synchronized(ctx){
+ while(!ctx.ready()){
+ ctx.wait();
+ }
+ }
+
+ if(ctx.getRC() != BKException.Code.OK) throw
BKException.create(ctx.getRC());
+ return ctx.getlastConfirmed();
+ }
+
// close the ledger and send fails to all the adds in the pipeline
void handleUnrecoverableErrorDuringAdd(int rc) {
asyncClose(NoopCloseCallback.instance, null, rc);
@@ -527,6 +587,21 @@ public class LedgerHandle implements Rea
counter.dec();
}
+
+
+ /**
+ * Implementation of callback interface for synchronous read last confirmed
method.
+ */
+ public void readLastConfirmedComplete(int rc, long lastConfirmed, Object
ctx) {
+ LastConfirmedCtx lcCtx = (LastConfirmedCtx) ctx;
+
+ synchronized(lcCtx){
+ lcCtx.setRC(rc);
+ lcCtx.setLastConfirmed(lastConfirmed);
+ lcCtx.notify();
+ }
+ }
+
/**
* Close callback method
*
Modified:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1151958&r1=1151957&r2=1151958&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
(original)
+++
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
Thu Jul 28 18:33:52 2011
@@ -39,13 +39,14 @@ import org.apache.zookeeper.data.Stat;
class LedgerOpenOp implements DataCallback {
static final Logger LOG = Logger.getLogger(LedgerOpenOp.class);
- BookKeeper bk;
- long ledgerId;
- OpenCallback cb;
- Object ctx;
+ final BookKeeper bk;
+ final long ledgerId;
+ final OpenCallback cb;
+ final Object ctx;
LedgerHandle lh;
- byte[] passwd;
- DigestType digestType;
+ final byte[] passwd;
+ final DigestType digestType;
+ final boolean unsafe;
/**
* Constructor.
@@ -58,13 +59,14 @@ class LedgerOpenOp implements DataCallba
* @param ctx
*/
- public LedgerOpenOp(BookKeeper bk, long ledgerId, DigestType digestType,
byte[] passwd, OpenCallback cb, Object ctx) {
+ public LedgerOpenOp(BookKeeper bk, long ledgerId, DigestType digestType,
byte[] passwd, boolean unsafe, OpenCallback cb, Object ctx) {
this.bk = bk;
this.ledgerId = ledgerId;
this.passwd = passwd;
this.cb = cb;
this.ctx = ctx;
this.digestType = digestType;
+ this.unsafe = unsafe;
}
/**
@@ -126,15 +128,16 @@ class LedgerOpenOp implements DataCallba
return;
}
- lh.recover(new GenericCallback<Void>() {
- @Override
- public void operationComplete(int rc, Void result) {
- if (rc != BKException.Code.OK) {
- cb.openComplete(BKException.Code.LedgerRecoveryException,
null, LedgerOpenOp.this.ctx);
- } else {
+ if(!unsafe)
+ lh.recover(new GenericCallback<Void>() {
+ @Override
+ public void operationComplete(int rc, Void result) {
+ if (rc != BKException.Code.OK) {
+
cb.openComplete(BKException.Code.LedgerRecoveryException, null,
LedgerOpenOp.this.ctx);
+ } else {
cb.openComplete(BKException.Code.OK, lh,
LedgerOpenOp.this.ctx);
+ }
}
- }
- });
+ });
}
}
Modified:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java?rev=1151958&r1=1151957&r2=1151958&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
(original)
+++
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
Thu Jul 28 18:33:52 2011
@@ -32,9 +32,10 @@ import org.jboss.netty.buffer.ChannelBuf
/**
* This class encapsulated the ledger recovery operation. It first does a read
- * with entry-id of -1 to all bookies. Then starting from the last confirmed
- * entry (from hints in the ledger entries), it reads forward until it is not
- * able to find a particular entry. It closes the ledger at that entry.
+ * with entry-id of -1 (LedgerHandle.LAST_ADD_CONFIRMED) to all bookies. Then
+ * starting from the last confirmed entry (from hints in the ledger entries),
+ * it reads forward until it is not able to find a particular entry. It closes
+ * the ledger at that entry.
*
*/
class LedgerRecoveryOp implements ReadEntryCallback, ReadCallback, AddCallback
{
@@ -56,7 +57,7 @@ class LedgerRecoveryOp implements ReadEn
public void initiate() {
for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
- lh.bk.bookieClient.readEntry(lh.metadata.currentEnsemble.get(i),
lh.ledgerId, -1, this, i);
+ lh.bk.bookieClient.readEntry(lh.metadata.currentEnsemble.get(i),
lh.ledgerId, LedgerHandle.LAST_ADD_CONFIRMED, this, i);
}
}
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java?rev=1151958&view=auto
==============================================================================
---
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
(added)
+++
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
Thu Jul 28 18:33:52 2011
@@ -0,0 +1,101 @@
+package org.apache.bookkeeper.client;
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Enumeration;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.LedgerHandle.NoopCloseCallback;
+import org.apache.bookkeeper.client.DigestManager.RecoveryData;
+import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * This class encapsulated the read last confirmed operation.
+ *
+ */
+class ReadLastConfirmedOp implements ReadEntryCallback {
+ static final Logger LOG = Logger.getLogger(LedgerRecoveryOp.class);
+ LedgerHandle lh;
+ Object ctx;
+ int numResponsesPending;
+ int validResponses;
+ long maxAddConfirmed;
+ long maxLength = 0;
+ volatile boolean notComplete = true;
+
+ ReadLastConfirmedCallback cb;
+
+ public ReadLastConfirmedOp(LedgerHandle lh, ReadLastConfirmedCallback cb,
Object ctx) {
+ this.cb = cb;
+ this.ctx = ctx;
+ this.lh = lh;
+ this.validResponses = 0;
+ this.numResponsesPending = lh.metadata.ensembleSize;
+ }
+
+ public void initiate() {
+ for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+ lh.bk.bookieClient.readEntry(lh.metadata.currentEnsemble.get(i),
lh.ledgerId, LedgerHandle.LAST_ADD_CONFIRMED, this, i);
+ }
+ }
+
+ public synchronized void readEntryComplete(final int rc, final long
ledgerId, final long entryId,
+ final ChannelBuffer buffer, final Object ctx) {
+ int bookieIndex = (Integer) ctx;
+
+ numResponsesPending--;
+
+ if (rc == BKException.Code.OK) {
+ try {
+ RecoveryData recoveryData =
lh.macManager.verifyDigestAndReturnLastConfirmed(buffer);
+ maxAddConfirmed = Math.max(maxAddConfirmed,
recoveryData.lastAddConfirmed);
+ validResponses++;
+ } catch (BKDigestMatchException e) {
+ // Too bad, this bookie didn't give us a valid answer, we
+ // still might be able to recover though so continue
+ LOG.error("Mac mismatch while reading last entry from bookie: "
+ + lh.metadata.currentEnsemble.get(bookieIndex));
+ }
+ }
+
+ if (rc == BKException.Code.NoSuchLedgerExistsException || rc ==
BKException.Code.NoSuchEntryException) {
+ // this still counts as a valid response, e.g., if the client
crashed without writing any entry
+ validResponses++;
+ }
+
+ // other return codes dont count as valid responses
+ if ((validResponses >= lh.metadata.quorumSize) &&
+ notComplete) {
+ notComplete = false;
+ cb.readLastConfirmedComplete(BKException.Code.OK, maxAddConfirmed,
this.ctx);
+ return;
+ }
+
+ if (numResponsesPending == 0) {
+ // Have got all responses back but was still not enough, just fail
the operation
+ LOG.error("While recovering ledger: " + ledgerId + " did not hear
success responses from all quorums");
+
cb.readLastConfirmedComplete(BKException.Code.LedgerRecoveryException,
maxAddConfirmed, ctx);
+ }
+
+ }
+}
Modified:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=1151958&r1=1151957&r2=1151958&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
(original)
+++
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
Thu Jul 28 18:33:52 2011
@@ -36,11 +36,12 @@ import java.util.concurrent.Semaphore;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.streaming.LedgerInputStream;
import org.apache.bookkeeper.streaming.LedgerOutputStream;
@@ -58,7 +59,8 @@ import org.junit.Test;
*
*/
-public class BookieReadWriteTest extends BaseTestCase implements AddCallback,
ReadCallback {
+public class BookieReadWriteTest extends BaseTestCase
+implements AddCallback, ReadCallback, ReadLastConfirmedCallback {
// Depending on the taste, select the amount of logging
// by decommenting one of the two lines below
@@ -88,11 +90,13 @@ public class BookieReadWriteTest extends
Set<Object> syncObjs;
class SyncObj {
+ long lastConfirmed;
volatile int counter;
boolean value;
public SyncObj() {
counter = 0;
+ lastConfirmed = -1;
value = false;
}
}
@@ -714,6 +718,157 @@ public class BookieReadWriteTest extends
}
}
+ public void testReadFromOpenLedger() throws IOException {
+ try {
+ // Create a BookKeeper client and a ledger
+ bkc = new BookKeeper("127.0.0.1");
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ // bkc.initMessageDigest("SHA1");
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + lh.getId());
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries.add(entry.array());
+ entriesSize.add(entry.array().length);
+ lh.addEntry(entry.array());
+ if(i == numEntriesToWrite/2){
+ LedgerHandle lhOpen = bkc.openLedgerNoRecovery(ledgerId,
digestType, ledgerPassword);
+ Enumeration<LedgerEntry> readEntry = lh.readEntries(i, i);
+ assertTrue("Enumeration of ledger entries has no element",
readEntry.hasMoreElements() == true);
+ }
+ }
+
+ long last = lh.readLastConfirmed();
+ assertTrue("Last confirmed add: " + last, last ==
(numEntriesToWrite - 2));
+
+ LOG.debug("*** WRITE COMPLETE ***");
+ // close ledger
+ lh.close();
+ /*
+ * Asynchronous call to read last confirmed entry
+ */
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ // bkc.initMessageDigest("SHA1");
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + lh.getId());
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries.add(entry.array());
+ entriesSize.add(entry.array().length);
+ lh.addEntry(entry.array());
+ }
+
+
+ SyncObj sync = new SyncObj();
+ lh.asyncReadLastConfirmed(this, sync);
+
+ // Wait for for last confirmed
+ synchronized (sync) {
+ while (sync.lastConfirmed == -1) {
+ LOG.debug("Counter = " + sync.lastConfirmed);
+ sync.wait();
+ }
+ }
+
+ assertTrue("Last confirmed add: " + sync.lastConfirmed,
sync.lastConfirmed == (numEntriesToWrite - 2));
+
+ LOG.debug("*** WRITE COMPLETE ***");
+ // close ledger
+ lh.close();
+
+
+ } catch (KeeperException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to ZooKeeper exception");
+ } catch (BKException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to BookKeeper exception");
+ } catch (InterruptedException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to interruption");
+ }
+ }
+
+
+ @Test
+ public void testLastConfirmedAdd() throws IOException {
+ try {
+ // Create a BookKeeper client and a ledger
+ bkc = new BookKeeper("127.0.0.1");
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ // bkc.initMessageDigest("SHA1");
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + lh.getId());
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries.add(entry.array());
+ entriesSize.add(entry.array().length);
+ lh.addEntry(entry.array());
+ }
+
+ long last = lh.readLastConfirmed();
+ assertTrue("Last confirmed add: " + last, last ==
(numEntriesToWrite - 2));
+
+ LOG.debug("*** WRITE COMPLETE ***");
+ // close ledger
+ lh.close();
+ /*
+ * Asynchronous call to read last confirmed entry
+ */
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ // bkc.initMessageDigest("SHA1");
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + lh.getId());
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries.add(entry.array());
+ entriesSize.add(entry.array().length);
+ lh.addEntry(entry.array());
+ }
+
+
+ SyncObj sync = new SyncObj();
+ lh.asyncReadLastConfirmed(this, sync);
+
+ // Wait for for last confirmed
+ synchronized (sync) {
+ while (sync.lastConfirmed == -1) {
+ LOG.debug("Counter = " + sync.lastConfirmed);
+ sync.wait();
+ }
+ }
+
+ assertTrue("Last confirmed add: " + sync.lastConfirmed,
sync.lastConfirmed == (numEntriesToWrite - 2));
+
+ LOG.debug("*** WRITE COMPLETE ***");
+ // close ledger
+ lh.close();
+
+
+ } catch (KeeperException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to ZooKeeper exception");
+ } catch (BKException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to BookKeeper exception");
+ } catch (InterruptedException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to interruption");
+ }
+ }
+
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx)
{
if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);
@@ -737,6 +892,15 @@ public class BookieReadWriteTest extends
}
}
+ public void readLastConfirmedComplete(int rc, long lastConfirmed, Object
ctx) {
+ SyncObj sync = (SyncObj) ctx;
+
+ synchronized(sync){
+ sync.lastConfirmed = lastConfirmed;
+ sync.notify();
+ }
+ }
+
@Before
public void setUp() throws Exception{
super.setUp();
Modified: zookeeper/bookkeeper/trunk/doc/bookkeeperProgrammer.textile
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/doc/bookkeeperProgrammer.textile?rev=1151958&r1=1151957&r2=1151958&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/doc/bookkeeperProgrammer.textile (original)
+++ zookeeper/bookkeeper/trunk/doc/bookkeeperProgrammer.textile Thu Jul 28
18:33:52 2011
@@ -145,7 +145,7 @@ p. To read from a ledger, a client must
@public LedgerHandle openLedger(long lId, DigestType type, byte passwd[])
throws InterruptedException, BKException @
-* @ledgerId@ is the ledger identifier;
+* @lId@ is the ledger identifier;
* @type@ is the type of digest used with entries: either MAC or CRC32.
* @passwd@ is a password to access the ledger (used only in the case of
@VERIFIABLE@ ledgers);
@@ -164,6 +164,30 @@ p. where:
* @lh@ is a @LedgerHandle@ object to manipulate a ledger;
* @ctx@ is control object used for accountability purposes.
+p. The two calls above to open a ledger recover the ledger if it has not been
closed properly before it reads entries from it. It is also possible to open a
ledger and read from it without triggering the recovery process with the
following methods of @org.apache.bookkeeper.client.BookKeeper@ .
+
+ _Synchronous open:_
+
+ @public LedgerHandle openLedgerNoRecovery(long lId, DigestType type, byte
passwd[]) throws InterruptedException, BKException @
+
+* @lId@ is the ledger identifier;
+* @type@ is the type of digest used with entries: either MAC or CRC32.
+* @passwd@ is a password to access the ledger (used only in the case of
@VERIFIABLE@ ledgers);
+
+
+ _Asynchronous open:_
+
+ @public void asyncOpenLedgerNoRecovery(long lId, DigestType type, byte
passwd[], OpenCallback cb, Object ctx) @
+
+p. It also takes a a ledger identifier and a password. Additionaly, it takes a
callback object @cb@ and a control object @ctx@ . The callback object must
implement the @OpenCallback@ interface in
@org.apache.bookkeeper.client.AsyncCallback@ , and a class implementing it has
to implement a method called @openComplete@ that has the following signature:
+
+ @public void openComplete(int rc, LedgerHandle lh, Object ctx) @
+
+p. where:
+
+* @rc@ is a return code (please refer to @org.apache.bookeeper.client.BKDefs@
for a list);
+* @lh@ is a @LedgerHandle@ object to manipulate a ledger;
+* @ctx@ is control object used for accountability purposes.
h1. Reading from ledger
@@ -217,3 +241,24 @@ p. where:
* @rc@ is a return code (please refer to @org.apache.bookeeper.client.BKDefs@
for a list);
* @ctx@ is control object used for accountability purposes.
+h1. Reading the last confirmed entry from a ledger
+
+p. When reading from an open ledger (opening without recovery enables it), it
is often necessary to read the last confirmed hint that the writer client
writes along with every entry. The general idea is to obtain the hint from a
quorum and return the largest value. The following methods belong to
@org.apache.bookkeeper.client.BookKeeper@ .
+
+ _Synchronous read:_
+
+ @public long readLastConfirmed() @
+
+ _Asynchronous read:_
+
+ @public void asyncReadLastConfirmed(ReadLastConfirmedComplete cb, Object ctx)
@
+
+p. It takes a callback object @cb@ and a control object @ctx@ . The callback
object must implement the @ReadLastConfirmedCallback@ interface in
@org.apache.bookkeeper.client.AsyncCallback@ , and a class implementing it has
to implement a method called @readLastConfirmedComplete@ that has the following
signature:
+
+@public void readLastConfirmedComplete (int rc, long lastConfirmed, Object
ctx) @
+
+p. where:
+
+* @rc@ is a return code (please refer to @org.apache.bookeeper.client.BKDefs@
for a list);
+* @lastConfirmed@ is the maximum last confirmed hint received among a quorum
of bookies;
+* @ctx@ is control object used for accountability purposes.
\ No newline at end of file