This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 7d93236 Reduce running time for testLedgerCreateAdvWithLedgerIdInLoop
7d93236 is described below
commit 7d932365eb942e0dde03d4bd73ee4500576afd0d
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Mar 8 22:19:39 2018 -0800
Reduce running time for testLedgerCreateAdvWithLedgerIdInLoop
This test has flaked in CI regularly mainly because it takes a long
time to run.
This changes reduces the running time by.
1. Creating the ledgers with a ensemble size of 1
2. Creating and writing to the ledgers in parallel
3. Using async methods to write entries to the ledgers
This brought the test from ~60 seconds to ~7 seconds locally.
Author: Ivan Kelly <[email protected]>
Reviewers: Charan Reddy Guttapalem <[email protected]>, Sijie Guo
<[email protected]>
This closes #1233 from ivankelly/ledger-create-adv-timeout
---
.../bookkeeper/client/BookieWriteLedgerTest.java | 114 ++++++++++++++-------
1 file changed, 76 insertions(+), 38 deletions(-)
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index cab3f8a..9a63e70 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -42,16 +42,21 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException.BKLedgerClosedException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Before;
@@ -95,7 +100,7 @@ public class BookieWriteLedgerTest extends
@Before
public void setUp() throws Exception {
super.setUp();
- rng = new Random(System.currentTimeMillis()); // Initialize the Random
+ rng = new Random(0); // Initialize the Random
// Number Generator
entries1 = new ArrayList<byte[]>(); // initialize the entries list
entries2 = new ArrayList<byte[]>(); // initialize the entries list
@@ -570,7 +575,6 @@ public class BookieWriteLedgerTest extends
bkc.deleteLedger(ledgerId);
}
-
/**
* In a loop create/write/delete the ledger with same ledgerId through
* the functionality of Advanced Ledger which accepts ledgerId as input.
@@ -579,45 +583,65 @@ public class BookieWriteLedgerTest extends
*/
@Test
public void testLedgerCreateAdvWithLedgerIdInLoop() throws Exception {
- long ledgerId;
int ledgerCount = 40;
- List<List<byte[]>> entryList = new ArrayList<List<byte[]>>();
- LedgerHandle[] lhArray = new LedgerHandle[ledgerCount];
-
- List<byte[]> tmpEntry;
- for (int lc = 0; lc < ledgerCount; lc++) {
- tmpEntry = new ArrayList<byte[]>();
-
- ledgerId = rng.nextLong();
- ledgerId &= Long.MAX_VALUE;
- if
(!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
{
- // since LongHierarchicalLedgerManager supports ledgerIds of
decimal length upto 19 digits but other
- // LedgerManagers only upto 10 decimals
- ledgerId %= 9999999999L;
- }
-
- LOG.info("Iteration: {} LedgerId: {}", lc, ledgerId);
- lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType,
ledgerPassword, null);
- lhArray[lc] = lh;
-
- for (int i = 0; i < numEntriesToWrite; i++) {
- ByteBuffer entry = ByteBuffer.allocate(4);
- entry.putInt(rng.nextInt(maxInt));
- entry.position(0);
- tmpEntry.add(entry.array());
- lh.addEntry(i, entry.array());
- }
- entryList.add(tmpEntry);
- }
- for (int lc = 0; lc < ledgerCount; lc++) {
- // Read and verify
- long lid = lhArray[lc].getId();
- LOG.info("readEntries for lc: {} ledgerId: {} ", lc,
lhArray[lc].getId());
- readEntries(lhArray[lc], entryList.get(lc));
- lhArray[lc].close();
- bkc.deleteLedger(lid);
+ long maxId = 9999999999L;
+ if
(baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
{
+ // since LongHierarchicalLedgerManager supports ledgerIds of
decimal length upto 19 digits but other
+ // LedgerManagers only upto 10 decimals
+ maxId = Long.MAX_VALUE;
}
+
+ rng.longs(ledgerCount, 0, maxId) // generate a stream of ledger ids
+ .mapToObj(ledgerId -> { // create a ledger for each ledger id
+ LOG.info("Creating adv ledger with id {}", ledgerId);
+ return bkc.newCreateLedgerOp()
+
.withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1)
+
.withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32)
+
.withPassword(ledgerPassword).makeAdv().withLedgerId(ledgerId)
+ .execute()
+ .thenApply(writer -> { // Add entries to ledger when
created
+ LOG.info("Writing stream of {} entries to {}",
+ numEntriesToWrite, ledgerId);
+ List<ByteBuf> entries =
rng.ints(numEntriesToWrite, 0, maxInt)
+ .mapToObj(i -> {
+ ByteBuf entry = Unpooled.buffer(4);
+ entry.retain();
+ entry.writeInt(i);
+ return entry;
+ })
+ .collect(Collectors.toList());
+ CompletableFuture<?> lastRequest = null;
+ int i = 0;
+ for (ByteBuf entry : entries) {
+ long entryId = i++;
+ LOG.info("Writing {}:{} as {}",
+ ledgerId, entryId,
entry.slice().readInt());
+ lastRequest = writer.write(entryId, entry);
+ }
+ lastRequest.join();
+ return Pair.of(writer, entries);
+ });
+ })
+ .parallel().map(CompletableFuture::join) // wait for all creations
and adds in parallel
+ .forEach(e -> { // check that each set of adds succeeded
+ try {
+ WriteAdvHandle handle = e.getLeft();
+ List<ByteBuf> entries = e.getRight();
+ // Read and verify
+ LOG.info("Read entries for ledger: {}",
handle.getId());
+ readEntries(handle, entries);
+ entries.forEach(ByteBuf::release);
+ handle.close();
+ bkc.deleteLedger(handle.getId());
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ Assert.fail("Test interrupted");
+ } catch (Exception ex) {
+ LOG.info("Readback failed with exception", ex);
+ Assert.fail("Readback failed " + ex.getMessage());
+ }
+ });
}
/**
@@ -1191,6 +1215,20 @@ public class BookieWriteLedgerTest extends
}
}
+ private void readEntries(ReadHandle reader, List<ByteBuf> entries) throws
Exception {
+ assertEquals("Not enough entries in ledger " + reader.getId(),
+ reader.getLastAddConfirmed(), entries.size() - 1);
+ try (LedgerEntries readEntries = reader.read(0,
reader.getLastAddConfirmed()).join()) {
+ int i = 0;
+ for (org.apache.bookkeeper.client.api.LedgerEntry e : readEntries)
{
+ int entryId = i++;
+ ByteBuf origEntry = entries.get(entryId);
+ ByteBuf readEntry = e.getEntryBuffer();
+ assertEquals("Unexpected contents in " + reader.getId() + ":"
+ entryId, origEntry, readEntry);
+ }
+ }
+ }
+
private void readEntriesAndValidateDataArray(LedgerHandle lh, List<byte[]>
entries)
throws InterruptedException, BKException {
ls = lh.readEntries(0, entries.size() - 1);
--
To stop receiving notification emails like this one, please contact
[email protected].