Repository: bookkeeper Updated Branches: refs/heads/master c72ff5efb -> 9c79e078b
BOOKKEEPER-1019: Support for reading entries after LAC This patch introduces a new client-side configuration option to allow reads outside the boundary of the local LastAddConfirmed value. Author: eolivelli <[email protected]> Reviewers: Sijie Guo <[email protected]>, Matteo Merli <[email protected]> Closes #121 from eolivelli/BOOKKEEPER-1019-read-after-lac Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/9c79e078 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/9c79e078 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/9c79e078 Branch: refs/heads/master Commit: 9c79e078b8cfefc24251aefcb727760fb99229ed Parents: c72ff5e Author: eolivelli <[email protected]> Authored: Tue May 2 22:07:24 2017 +0200 Committer: Enrico Olivelli <[email protected]> Committed: Tue May 2 22:07:24 2017 +0200 ---------------------------------------------------------------------- .../apache/bookkeeper/client/LedgerHandle.java | 63 ++++++ .../bookkeeper/conf/ClientConfiguration.java | 1 + .../bookkeeper/client/BookKeeperTest.java | 220 ++++++++++++++++++- 3 files changed, 283 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9c79e078/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index b68bb11..d1e5540 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -470,6 +470,7 @@ public class LedgerHandle implements AutoCloseable { * @param lastEntry * id of last entry of sequence (included) * + * @see #asyncReadEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object) */ public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry) throws InterruptedException, BKException { @@ -481,6 +482,29 @@ public class LedgerHandle implements AutoCloseable { } /** + * Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.<br> + * This is the same of + * {@link #asyncReadUnconfirmedEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object) } + * + * @param firstEntry + * id of first entry of sequence (included) + * @param lastEntry + * id of last entry of sequence (included) + * + * @see #readEntries(long, long) + * @see #asyncReadUnconfirmedEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object) + * @see #asyncReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback, java.lang.Object) + */ + public Enumeration<LedgerEntry> readUnconfirmedEntries(long firstEntry, long lastEntry) + throws InterruptedException, BKException { + CompletableFuture<Enumeration<LedgerEntry>> counter = new CompletableFuture<>(); + + asyncReadUnconfirmedEntries(firstEntry, lastEntry, new SyncReadCallback(), counter); + + return SynchCallbackUtils.waitForResult(counter); + } + + /** * Read a sequence of entries asynchronously. * * @param firstEntry @@ -511,6 +535,45 @@ public class LedgerHandle implements AutoCloseable { asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx); } + /** + * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. + * <br>This is the same of + * {@link #asyncReadEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object) } + * but it lets the client read without checking the local value of LastAddConfirmed, so that it is possibile to + * read entries for which the writer has not received the acknowledge yet. <br> + * For entries which are within the range 0..LastAddConfirmed BookKeeper guarantees that the writer has successfully + * received the acknowledge.<br> + * For entries outside that range it is possible that the writer never received the acknoledge + * and so there is the risk that the reader is seeing entries before the writer and this could result in a consistency + * issue in some cases.<br> + * With this method you can even read entries before the LastAddConfirmed and entries after it with one call, + * the expected consistency will be as described above for each subrange of ids. + * + * @param firstEntry + * id of first entry of sequence + * @param lastEntry + * id of last entry of sequence + * @param cb + * object implementing read callback interface + * @param ctx + * control object + * + * @see #asyncReadEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object) + * @see #asyncReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback, java.lang.Object) + * @see #readUnconfirmedEntries(long, long) + */ + public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) { + // Little sanity check + if (firstEntry < 0 || firstEntry > lastEntry) { + LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}", + new Object[] { ledgerId, firstEntry, lastEntry }); + cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx); + return; + } + + asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx); + } + void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) { try { new PendingReadOp(this, bk.scheduler, http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9c79e078/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 2b75e9e..6b913d4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -1072,4 +1072,5 @@ public class ClientConfiguration extends AbstractConfiguration { public String getClientRole() { return getString(CLIENT_ROLE, CLIENT_ROLE_STANDARD); } + } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9c79e078/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java index 0097028..17d63b3 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java @@ -1,5 +1,6 @@ package org.apache.bookkeeper.client; +import java.util.Collections; import java.util.Enumeration; /* @@ -432,5 +433,222 @@ public class BookKeeperTest extends BaseTestCase { rlh.close(); wlh.close(); bkcWithExplicitLAC.close(); - } + } + + @Test(timeout = 60000) + public void testReadAfterLastAddConfirmed() throws Exception { + + ClientConfiguration clientConfiguration = new ClientConfiguration() + .setZkServers(zkUtil.getZooKeeperConnectString()); + + try (BookKeeper bkWriter = new BookKeeper(clientConfiguration);) { + LedgerHandle writeLh = bkWriter.createLedger(digestType, "testPasswd".getBytes()); + long ledgerId = writeLh.getId(); + int numOfEntries = 5; + for (int i = 0; i < numOfEntries; i++) { + writeLh.addEntry(("foobar" + i).getBytes()); + } + + try (BookKeeper bkReader = new BookKeeper(clientConfiguration); + LedgerHandle rlh = bkReader.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());) { + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + Assert.assertFalse(writeLh.isClosed()); + + // with readUnconfirmedEntries we are able to read all of the entries + Enumeration<LedgerEntry> entries = rlh.readUnconfirmedEntries(0, numOfEntries - 1); + int entryId = 0; + while (entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + String entryString = new String(entry.getEntry()); + Assert.assertTrue("Expected entry String: " + ("foobar" + entryId) + + " actual entry String: " + entryString, + entryString.equals("foobar" + entryId)); + entryId++; + } + } + + try (BookKeeper bkReader = new BookKeeper(clientConfiguration); + LedgerHandle rlh = bkReader.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());) { + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + Assert.assertFalse(writeLh.isClosed()); + + // without readUnconfirmedEntries we are not able to read all of the entries + try { + rlh.readEntries(0, numOfEntries - 1); + fail("shoud not be able to read up to "+ (numOfEntries - 1) + " with readEntries"); + } catch (BKException.BKReadException expected) { + } + + // read all entries within the 0..LastAddConfirmed range with readEntries + assertEquals(rlh.getLastAddConfirmed() + 1, + Collections.list(rlh.readEntries(0, rlh.getLastAddConfirmed())).size()); + + // assert local LAC does not change after reads + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + // read all entries within the 0..LastAddConfirmed range with readUnconfirmedEntries + assertEquals(rlh.getLastAddConfirmed() + 1, + Collections.list(rlh.readUnconfirmedEntries(0, rlh.getLastAddConfirmed())).size()); + + // assert local LAC does not change after reads + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + // read all entries within the LastAddConfirmed..numOfEntries - 1 range with readUnconfirmedEntries + assertEquals(numOfEntries - rlh.getLastAddConfirmed(), + Collections.list(rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(), numOfEntries - 1)).size()); + + // assert local LAC does not change after reads + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + try { + // read all entries within the LastAddConfirmed..numOfEntries range with readUnconfirmedEntries + // this is an error, we are going outside the range of existing entries + rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(), numOfEntries); + fail("the read tried to access data for unexisting entry id "+numOfEntries); + } catch (BKException.BKNoSuchEntryException expected) { + // expecting a BKNoSuchEntryException, as the entry does not exist on bookies + } + + try { + // read all entries within the LastAddConfirmed..numOfEntries range with readEntries + // this is an error, we are going outside the range of existing entries + rlh.readEntries(rlh.getLastAddConfirmed(), numOfEntries); + fail("the read tries to access data for unexisting entry id "+numOfEntries); + } catch (BKException.BKReadException expected) { + // expecting a BKReadException, as the client rejected the request to access entries + // after local LastAddConfirmed + } + + } + + // ensure that after restarting every bookie entries are not lost + // even entries after the LastAddConfirmed + restartBookies(); + + try (BookKeeper bkReader = new BookKeeper(clientConfiguration); + LedgerHandle rlh = bkReader.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());) { + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + Assert.assertFalse(writeLh.isClosed()); + + // with readUnconfirmedEntries we are able to read all of the entries + Enumeration<LedgerEntry> entries = rlh.readUnconfirmedEntries(0, numOfEntries - 1); + int entryId = 0; + while (entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + String entryString = new String(entry.getEntry()); + Assert.assertTrue("Expected entry String: " + ("foobar" + entryId) + + " actual entry String: " + entryString, + entryString.equals("foobar" + entryId)); + entryId++; + } + } + + try (BookKeeper bkReader = new BookKeeper(clientConfiguration); + LedgerHandle rlh = bkReader.openLedgerNoRecovery(ledgerId, digestType, "testPasswd".getBytes());) { + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + Assert.assertFalse(writeLh.isClosed()); + + // without readUnconfirmedEntries we are not able to read all of the entries + try { + rlh.readEntries(0, numOfEntries - 1); + fail("shoud not be able to read up to "+ (numOfEntries - 1) + " with readEntries"); + } catch (BKException.BKReadException expected) { + } + + // read all entries within the 0..LastAddConfirmed range with readEntries + assertEquals(rlh.getLastAddConfirmed() + 1, + Collections.list(rlh.readEntries(0, rlh.getLastAddConfirmed())).size()); + + // assert local LAC does not change after reads + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + // read all entries within the 0..LastAddConfirmed range with readUnconfirmedEntries + assertEquals(rlh.getLastAddConfirmed() + 1, + Collections.list(rlh.readUnconfirmedEntries(0, rlh.getLastAddConfirmed())).size()); + + // assert local LAC does not change after reads + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + // read all entries within the LastAddConfirmed..numOfEntries - 1 range with readUnconfirmedEntries + assertEquals(numOfEntries - rlh.getLastAddConfirmed(), + Collections.list(rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(), numOfEntries - 1)).size()); + + // assert local LAC does not change after reads + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 2))); + + try { + // read all entries within the LastAddConfirmed..numOfEntries range with readUnconfirmedEntries + // this is an error, we are going outside the range of existing entries + rlh.readUnconfirmedEntries(rlh.getLastAddConfirmed(), numOfEntries); + fail("the read tried to access data for unexisting entry id "+numOfEntries); + } catch (BKException.BKNoSuchEntryException expected) { + // expecting a BKNoSuchEntryException, as the entry does not exist on bookies + } + + try { + // read all entries within the LastAddConfirmed..numOfEntries range with readEntries + // this is an error, we are going outside the range of existing entries + rlh.readEntries(rlh.getLastAddConfirmed(), numOfEntries); + fail("the read tries to access data for unexisting entry id "+numOfEntries); + } catch (BKException.BKReadException expected) { + // expecting a BKReadException, as the client rejected the request to access entries + // after local LastAddConfirmed + } + + } + + // open ledger with fencing, this will repair the ledger and make the last entry readable + try (BookKeeper bkReader = new BookKeeper(clientConfiguration); + LedgerHandle rlh = bkReader.openLedger(ledgerId, digestType, "testPasswd".getBytes());) { + Assert.assertTrue( + "Expected LAC of rlh: " + (numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(), + (rlh.getLastAddConfirmed() == (numOfEntries - 1))); + + Assert.assertFalse(writeLh.isClosed()); + + // without readUnconfirmedEntries we are not able to read all of the entries + Enumeration<LedgerEntry> entries = rlh.readEntries(0, numOfEntries - 1); + int entryId = 0; + while (entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + String entryString = new String(entry.getEntry()); + Assert.assertTrue("Expected entry String: " + ("foobar" + entryId) + + " actual entry String: " + entryString, + entryString.equals("foobar" + entryId)); + entryId++; + } + } + + try { + writeLh.close(); + fail("should not be able to close the first LedgerHandler as a recovery has been performed"); + } catch (BKException.BKMetadataVersionException expected) { + } + + } + } }
