ivankelly commented on a change in pull request #1233: Reduce running time for 
testLedgerCreateAdvWithLedgerIdInLoop
URL: https://github.com/apache/bookkeeper/pull/1233#discussion_r172818962
 
 

 ##########
 File path: 
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
 ##########
 @@ -579,45 +584,58 @@ public void addComplete(int rc, LedgerHandle lh, long 
entryId, Object ctx) {
      */
     @Test
     public void testLedgerCreateAdvWithLedgerIdInLoop() throws Exception {
-        long ledgerId;
         int ledgerCount = 40;
 
-        List<List<byte[]>> entryList = new ArrayList<List<byte[]>>();
-        LedgerHandle[] lhArray = new LedgerHandle[ledgerCount];
-
-        List<byte[]> tmpEntry;
-        for (int lc = 0; lc < ledgerCount; lc++) {
-            tmpEntry = new ArrayList<byte[]>();
-
-            ledgerId = rng.nextLong();
-            ledgerId &= Long.MAX_VALUE;
-            if 
(!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
 {
-                // since LongHierarchicalLedgerManager supports ledgerIds of 
decimal length upto 19 digits but other
-                // LedgerManagers only upto 10 decimals
-                ledgerId %= 9999999999L;
-            }
-
-            LOG.info("Iteration: {}  LedgerId: {}", lc, ledgerId);
-            lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, 
ledgerPassword, null);
-            lhArray[lc] = lh;
-
-            for (int i = 0; i < numEntriesToWrite; i++) {
-                ByteBuffer entry = ByteBuffer.allocate(4);
-                entry.putInt(rng.nextInt(maxInt));
-                entry.position(0);
-                tmpEntry.add(entry.array());
-                lh.addEntry(i, entry.array());
-            }
-            entryList.add(tmpEntry);
-        }
-        for (int lc = 0; lc < ledgerCount; lc++) {
-            // Read and verify
-            long lid = lhArray[lc].getId();
-            LOG.info("readEntries for lc: {} ledgerId: {} ", lc, 
lhArray[lc].getId());
-            readEntries(lhArray[lc], entryList.get(lc));
-            lhArray[lc].close();
-            bkc.deleteLedger(lid);
+        long maxId = Long.MAX_VALUE;
+        if 
(!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
 {
+            // since LongHierarchicalLedgerManager supports ledgerIds of 
decimal length upto 19 digits but other
+            // LedgerManagers only upto 10 decimals
+            maxId = 9999999999L;
         }
+
+        rng.longs(ledgerCount, 1, maxId) // generate a stream of ledger ids
+            .mapToObj(ledgerId -> { // create a ledger for each ledger id
+                    LOG.info("Creating adv ledger with id {}", ledgerId);
+                    return bkc.newCreateLedgerOp()
+                        
.withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1)
+                        
.withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32)
+                        
.withPassword(ledgerPassword).makeAdv().withLedgerId(ledgerId)
+                        .execute()
+                        .thenApply(writer -> { // Add entries to ledger when 
created
+                                LOG.info("Writing stream of {} entries to {}",
+                                         numEntriesToWrite, ledgerId);
+                                List<Integer> entries = 
rng.ints(numEntriesToWrite, 0, maxInt)
+                                    .mapToObj(Integer::valueOf)
+                                    .collect(Collectors.toList());
+                                CompletableFuture<?> lastRequest = null;
+                                for (int i = 0; i < entries.size(); i++) {
+                                    ByteBuf entry = Unpooled.buffer(4);
+                                    entry.writeInt(entries.get(i));
+                                    LOG.info("Writing {}:{} as {}", ledgerId, 
i, entries.get(i));
+                                    lastRequest = writer.write(i, entry);
+                                }
+                                lastRequest.join();
+                                return Pair.of(writer, entries);
+                            });
+                })
+            .parallel().map(CompletableFuture::join) // wait for all creations 
and adds in parallel
+            .forEach(e -> { // check that each set of adds succeeded
 
 Review comment:
   Every operation on the stream after the parallel() is run in parallel, so 
reading is parallel too. Regarding junit assert, at its core it's a Throwable. 
Parallel will run the operations in a ForkJoinPool. If an operation throws a 
throwable, the join will throw it in the main thread. So if you assert in a 
parallel op, it gets thrown up into main thread. I've experimented with this a 
bit to ensure it's the case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to