reddycharan commented on a change in pull request #1233: Reduce running time
for testLedgerCreateAdvWithLedgerIdInLoop
URL: https://github.com/apache/bookkeeper/pull/1233#discussion_r172754140
##########
File path:
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
##########
@@ -579,45 +584,58 @@ public void addComplete(int rc, LedgerHandle lh, long
entryId, Object ctx) {
*/
@Test
public void testLedgerCreateAdvWithLedgerIdInLoop() throws Exception {
- long ledgerId;
int ledgerCount = 40;
- List<List<byte[]>> entryList = new ArrayList<List<byte[]>>();
- LedgerHandle[] lhArray = new LedgerHandle[ledgerCount];
-
- List<byte[]> tmpEntry;
- for (int lc = 0; lc < ledgerCount; lc++) {
- tmpEntry = new ArrayList<byte[]>();
-
- ledgerId = rng.nextLong();
- ledgerId &= Long.MAX_VALUE;
- if
(!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
{
- // since LongHierarchicalLedgerManager supports ledgerIds of
decimal length upto 19 digits but other
- // LedgerManagers only upto 10 decimals
- ledgerId %= 9999999999L;
- }
-
- LOG.info("Iteration: {} LedgerId: {}", lc, ledgerId);
- lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType,
ledgerPassword, null);
- lhArray[lc] = lh;
-
- for (int i = 0; i < numEntriesToWrite; i++) {
- ByteBuffer entry = ByteBuffer.allocate(4);
- entry.putInt(rng.nextInt(maxInt));
- entry.position(0);
- tmpEntry.add(entry.array());
- lh.addEntry(i, entry.array());
- }
- entryList.add(tmpEntry);
- }
- for (int lc = 0; lc < ledgerCount; lc++) {
- // Read and verify
- long lid = lhArray[lc].getId();
- LOG.info("readEntries for lc: {} ledgerId: {} ", lc,
lhArray[lc].getId());
- readEntries(lhArray[lc], entryList.get(lc));
- lhArray[lc].close();
- bkc.deleteLedger(lid);
+ long maxId = Long.MAX_VALUE;
+ if
(!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
{
+ // since LongHierarchicalLedgerManager supports ledgerIds of
decimal length upto 19 digits but other
+ // LedgerManagers only upto 10 decimals
+ maxId = 9999999999L;
}
+
+ rng.longs(ledgerCount, 1, maxId) // generate a stream of ledger ids
+ .mapToObj(ledgerId -> { // create a ledger for each ledger id
+ LOG.info("Creating adv ledger with id {}", ledgerId);
+ return bkc.newCreateLedgerOp()
+
.withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1)
+
.withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32)
+
.withPassword(ledgerPassword).makeAdv().withLedgerId(ledgerId)
+ .execute()
+ .thenApply(writer -> { // Add entries to ledger when
created
+ LOG.info("Writing stream of {} entries to {}",
+ numEntriesToWrite, ledgerId);
+ List<Integer> entries =
rng.ints(numEntriesToWrite, 0, maxInt)
+ .mapToObj(Integer::valueOf)
+ .collect(Collectors.toList());
+ CompletableFuture<?> lastRequest = null;
+ for (int i = 0; i < entries.size(); i++) {
+ ByteBuf entry = Unpooled.buffer(4);
+ entry.writeInt(entries.get(i));
+ LOG.info("Writing {}:{} as {}", ledgerId,
i, entries.get(i));
+ lastRequest = writer.write(i, entry);
+ }
+ lastRequest.join();
+ return Pair.of(writer, entries);
+ });
+ })
+ .parallel().map(CompletableFuture::join) // wait for all creations
and adds in parallel
+ .forEach(e -> { // check that each set of adds succeeded
Review comment:
I know junit assert would work only in the main thread and thats why you are
using foreach..but wondering if you run the read operations as well in parallel
stream, store the status of each operation and finally in foreach loop you can
validate the status by using junit asserts
Since you parallelized write operations why not read operations as well.
Just a thought, its not a blocker for me.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services