This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 7d93236 Reduce running time for testLedgerCreateAdvWithLedgerIdInLoop 7d93236 is described below commit 7d932365eb942e0dde03d4bd73ee4500576afd0d Author: Ivan Kelly <iv...@apache.org> AuthorDate: Thu Mar 8 22:19:39 2018 -0800 Reduce running time for testLedgerCreateAdvWithLedgerIdInLoop This test has flaked in CI regularly mainly because it takes a long time to run. This changes reduces the running time by. 1. Creating the ledgers with a ensemble size of 1 2. Creating and writing to the ledgers in parallel 3. Using async methods to write entries to the ledgers This brought the test from ~60 seconds to ~7 seconds locally. Author: Ivan Kelly <iv...@apache.org> Reviewers: Charan Reddy Guttapalem <reddychara...@gmail.com>, Sijie Guo <si...@apache.org> This closes #1233 from ivankelly/ledger-create-adv-timeout --- .../bookkeeper/client/BookieWriteLedgerTest.java | 114 ++++++++++++++------- 1 file changed, 76 insertions(+), 38 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index cab3f8a..9a63e70 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -42,16 +42,21 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BKException.BKLedgerClosedException; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.api.WriteAdvHandle; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.commons.lang3.tuple.Pair; import org.apache.zookeeper.KeeperException; import org.junit.Assert; import org.junit.Before; @@ -95,7 +100,7 @@ public class BookieWriteLedgerTest extends @Before public void setUp() throws Exception { super.setUp(); - rng = new Random(System.currentTimeMillis()); // Initialize the Random + rng = new Random(0); // Initialize the Random // Number Generator entries1 = new ArrayList<byte[]>(); // initialize the entries list entries2 = new ArrayList<byte[]>(); // initialize the entries list @@ -570,7 +575,6 @@ public class BookieWriteLedgerTest extends bkc.deleteLedger(ledgerId); } - /** * In a loop create/write/delete the ledger with same ledgerId through * the functionality of Advanced Ledger which accepts ledgerId as input. @@ -579,45 +583,65 @@ public class BookieWriteLedgerTest extends */ @Test public void testLedgerCreateAdvWithLedgerIdInLoop() throws Exception { - long ledgerId; int ledgerCount = 40; - List<List<byte[]>> entryList = new ArrayList<List<byte[]>>(); - LedgerHandle[] lhArray = new LedgerHandle[ledgerCount]; - - List<byte[]> tmpEntry; - for (int lc = 0; lc < ledgerCount; lc++) { - tmpEntry = new ArrayList<byte[]>(); - - ledgerId = rng.nextLong(); - ledgerId &= Long.MAX_VALUE; - if (!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) { - // since LongHierarchicalLedgerManager supports ledgerIds of decimal length upto 19 digits but other - // LedgerManagers only upto 10 decimals - ledgerId %= 9999999999L; - } - - LOG.info("Iteration: {} LedgerId: {}", lc, ledgerId); - lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null); - lhArray[lc] = lh; - - for (int i = 0; i < numEntriesToWrite; i++) { - ByteBuffer entry = ByteBuffer.allocate(4); - entry.putInt(rng.nextInt(maxInt)); - entry.position(0); - tmpEntry.add(entry.array()); - lh.addEntry(i, entry.array()); - } - entryList.add(tmpEntry); - } - for (int lc = 0; lc < ledgerCount; lc++) { - // Read and verify - long lid = lhArray[lc].getId(); - LOG.info("readEntries for lc: {} ledgerId: {} ", lc, lhArray[lc].getId()); - readEntries(lhArray[lc], entryList.get(lc)); - lhArray[lc].close(); - bkc.deleteLedger(lid); + long maxId = 9999999999L; + if (baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) { + // since LongHierarchicalLedgerManager supports ledgerIds of decimal length upto 19 digits but other + // LedgerManagers only upto 10 decimals + maxId = Long.MAX_VALUE; } + + rng.longs(ledgerCount, 0, maxId) // generate a stream of ledger ids + .mapToObj(ledgerId -> { // create a ledger for each ledger id + LOG.info("Creating adv ledger with id {}", ledgerId); + return bkc.newCreateLedgerOp() + .withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1) + .withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32) + .withPassword(ledgerPassword).makeAdv().withLedgerId(ledgerId) + .execute() + .thenApply(writer -> { // Add entries to ledger when created + LOG.info("Writing stream of {} entries to {}", + numEntriesToWrite, ledgerId); + List<ByteBuf> entries = rng.ints(numEntriesToWrite, 0, maxInt) + .mapToObj(i -> { + ByteBuf entry = Unpooled.buffer(4); + entry.retain(); + entry.writeInt(i); + return entry; + }) + .collect(Collectors.toList()); + CompletableFuture<?> lastRequest = null; + int i = 0; + for (ByteBuf entry : entries) { + long entryId = i++; + LOG.info("Writing {}:{} as {}", + ledgerId, entryId, entry.slice().readInt()); + lastRequest = writer.write(entryId, entry); + } + lastRequest.join(); + return Pair.of(writer, entries); + }); + }) + .parallel().map(CompletableFuture::join) // wait for all creations and adds in parallel + .forEach(e -> { // check that each set of adds succeeded + try { + WriteAdvHandle handle = e.getLeft(); + List<ByteBuf> entries = e.getRight(); + // Read and verify + LOG.info("Read entries for ledger: {}", handle.getId()); + readEntries(handle, entries); + entries.forEach(ByteBuf::release); + handle.close(); + bkc.deleteLedger(handle.getId()); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + Assert.fail("Test interrupted"); + } catch (Exception ex) { + LOG.info("Readback failed with exception", ex); + Assert.fail("Readback failed " + ex.getMessage()); + } + }); } /** @@ -1191,6 +1215,20 @@ public class BookieWriteLedgerTest extends } } + private void readEntries(ReadHandle reader, List<ByteBuf> entries) throws Exception { + assertEquals("Not enough entries in ledger " + reader.getId(), + reader.getLastAddConfirmed(), entries.size() - 1); + try (LedgerEntries readEntries = reader.read(0, reader.getLastAddConfirmed()).join()) { + int i = 0; + for (org.apache.bookkeeper.client.api.LedgerEntry e : readEntries) { + int entryId = i++; + ByteBuf origEntry = entries.get(entryId); + ByteBuf readEntry = e.getEntryBuffer(); + assertEquals("Unexpected contents in " + reader.getId() + ":" + entryId, origEntry, readEntry); + } + } + } + private void readEntriesAndValidateDataArray(LedgerHandle lh, List<byte[]> entries) throws InterruptedException, BKException { ls = lh.readEntries(0, entries.size() - 1); -- To stop receiving notification emails like this one, please contact si...@apache.org.