reddycharan commented on a change in pull request #1233: Reduce running time 
for testLedgerCreateAdvWithLedgerIdInLoop
URL: https://github.com/apache/bookkeeper/pull/1233#discussion_r172754140
 
 

 ##########
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
 ##########
 @@ -579,45 +584,58 @@ public void addComplete(int rc, LedgerHandle lh, long 
entryId, Object ctx) {
      */
     @Test
     public void testLedgerCreateAdvWithLedgerIdInLoop() throws Exception {
-        long ledgerId;
         int ledgerCount = 40;
 
-        List<List<byte[]>> entryList = new ArrayList<List<byte[]>>();
-        LedgerHandle[] lhArray = new LedgerHandle[ledgerCount];
-
-        List<byte[]> tmpEntry;
-        for (int lc = 0; lc < ledgerCount; lc++) {
-            tmpEntry = new ArrayList<byte[]>();
-
-            ledgerId = rng.nextLong();
-            ledgerId &= Long.MAX_VALUE;
-            if 
(!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
 {
-                // since LongHierarchicalLedgerManager supports ledgerIds of 
decimal length upto 19 digits but other
-                // LedgerManagers only upto 10 decimals
-                ledgerId %= 9999999999L;
-            }
-
-            LOG.info("Iteration: {}  LedgerId: {}", lc, ledgerId);
-            lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, 
ledgerPassword, null);
-            lhArray[lc] = lh;
-
-            for (int i = 0; i < numEntriesToWrite; i++) {
-                ByteBuffer entry = ByteBuffer.allocate(4);
-                entry.putInt(rng.nextInt(maxInt));
-                entry.position(0);
-                tmpEntry.add(entry.array());
-                lh.addEntry(i, entry.array());
-            }
-            entryList.add(tmpEntry);
-        }
-        for (int lc = 0; lc < ledgerCount; lc++) {
-            // Read and verify
-            long lid = lhArray[lc].getId();
-            LOG.info("readEntries for lc: {} ledgerId: {} ", lc, 
lhArray[lc].getId());
-            readEntries(lhArray[lc], entryList.get(lc));
-            lhArray[lc].close();
-            bkc.deleteLedger(lid);
+        long maxId = Long.MAX_VALUE;
+        if 
(!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
 {
+            // since LongHierarchicalLedgerManager supports ledgerIds of 
decimal length upto 19 digits but other
+            // LedgerManagers only upto 10 decimals
+            maxId = 9999999999L;
         }
+
+        rng.longs(ledgerCount, 1, maxId) // generate a stream of ledger ids
+            .mapToObj(ledgerId -> { // create a ledger for each ledger id
+                    LOG.info("Creating adv ledger with id {}", ledgerId);
+                    return bkc.newCreateLedgerOp()
+                        
.withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1)
+                        
.withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32)
+                        
.withPassword(ledgerPassword).makeAdv().withLedgerId(ledgerId)
+                        .execute()
+                        .thenApply(writer -> { // Add entries to ledger when 
created
+                                LOG.info("Writing stream of {} entries to {}",
+                                         numEntriesToWrite, ledgerId);
+                                List<Integer> entries = 
rng.ints(numEntriesToWrite, 0, maxInt)
+                                    .mapToObj(Integer::valueOf)
+                                    .collect(Collectors.toList());
+                                CompletableFuture<?> lastRequest = null;
+                                for (int i = 0; i < entries.size(); i++) {
+                                    ByteBuf entry = Unpooled.buffer(4);
+                                    entry.writeInt(entries.get(i));
+                                    LOG.info("Writing {}:{} as {}", ledgerId, 
i, entries.get(i));
+                                    lastRequest = writer.write(i, entry);
+                                }
+                                lastRequest.join();
+                                return Pair.of(writer, entries);
+                            });
+                })
+            .parallel().map(CompletableFuture::join) // wait for all creations 
and adds in parallel
+            .forEach(e -> { // check that each set of adds succeeded
 
 Review comment:
   I know junit assert would work only in the main thread and thats why you are 
using foreach..but wondering if you run the read operations as well in parallel 
stream, store the status of each operation and finally in foreach loop you can 
validate the status by using junit asserts
   
   Since you parallelized write operations why not read operations as well. 
Just a thought, its not a blocker for me. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to