sijie closed pull request #1233: Reduce running time for 
testLedgerCreateAdvWithLedgerIdInLoop
URL: https://github.com/apache/bookkeeper/pull/1233
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index cab3f8ac6..9a63e7081 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -42,16 +42,21 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException.BKLedgerClosedException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteAdvHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 import org.junit.Before;
@@ -95,7 +100,7 @@ public SyncObj() {
     @Before
     public void setUp() throws Exception {
         super.setUp();
-        rng = new Random(System.currentTimeMillis()); // Initialize the Random
+        rng = new Random(0); // Initialize the Random
         // Number Generator
         entries1 = new ArrayList<byte[]>(); // initialize the entries list
         entries2 = new ArrayList<byte[]>(); // initialize the entries list
@@ -570,7 +575,6 @@ public void addComplete(int rc, LedgerHandle lh, long 
entryId, Object ctx) {
         bkc.deleteLedger(ledgerId);
     }
 
-
     /**
      * In a loop create/write/delete the ledger with same ledgerId through
      * the functionality of Advanced Ledger which accepts ledgerId as input.
@@ -579,45 +583,65 @@ public void addComplete(int rc, LedgerHandle lh, long 
entryId, Object ctx) {
      */
     @Test
     public void testLedgerCreateAdvWithLedgerIdInLoop() throws Exception {
-        long ledgerId;
         int ledgerCount = 40;
 
-        List<List<byte[]>> entryList = new ArrayList<List<byte[]>>();
-        LedgerHandle[] lhArray = new LedgerHandle[ledgerCount];
-
-        List<byte[]> tmpEntry;
-        for (int lc = 0; lc < ledgerCount; lc++) {
-            tmpEntry = new ArrayList<byte[]>();
-
-            ledgerId = rng.nextLong();
-            ledgerId &= Long.MAX_VALUE;
-            if 
(!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
 {
-                // since LongHierarchicalLedgerManager supports ledgerIds of 
decimal length upto 19 digits but other
-                // LedgerManagers only upto 10 decimals
-                ledgerId %= 9999999999L;
-            }
-
-            LOG.info("Iteration: {}  LedgerId: {}", lc, ledgerId);
-            lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, 
ledgerPassword, null);
-            lhArray[lc] = lh;
-
-            for (int i = 0; i < numEntriesToWrite; i++) {
-                ByteBuffer entry = ByteBuffer.allocate(4);
-                entry.putInt(rng.nextInt(maxInt));
-                entry.position(0);
-                tmpEntry.add(entry.array());
-                lh.addEntry(i, entry.array());
-            }
-            entryList.add(tmpEntry);
-        }
-        for (int lc = 0; lc < ledgerCount; lc++) {
-            // Read and verify
-            long lid = lhArray[lc].getId();
-            LOG.info("readEntries for lc: {} ledgerId: {} ", lc, 
lhArray[lc].getId());
-            readEntries(lhArray[lc], entryList.get(lc));
-            lhArray[lc].close();
-            bkc.deleteLedger(lid);
+        long maxId = 9999999999L;
+        if 
(baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
 {
+            // since LongHierarchicalLedgerManager supports ledgerIds of 
decimal length upto 19 digits but other
+            // LedgerManagers only upto 10 decimals
+            maxId = Long.MAX_VALUE;
         }
+
+        rng.longs(ledgerCount, 0, maxId) // generate a stream of ledger ids
+            .mapToObj(ledgerId -> { // create a ledger for each ledger id
+                    LOG.info("Creating adv ledger with id {}", ledgerId);
+                    return bkc.newCreateLedgerOp()
+                        
.withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1)
+                        
.withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32)
+                        
.withPassword(ledgerPassword).makeAdv().withLedgerId(ledgerId)
+                        .execute()
+                        .thenApply(writer -> { // Add entries to ledger when 
created
+                                LOG.info("Writing stream of {} entries to {}",
+                                         numEntriesToWrite, ledgerId);
+                                List<ByteBuf> entries = 
rng.ints(numEntriesToWrite, 0, maxInt)
+                                    .mapToObj(i -> {
+                                            ByteBuf entry = Unpooled.buffer(4);
+                                            entry.retain();
+                                            entry.writeInt(i);
+                                            return entry;
+                                        })
+                                    .collect(Collectors.toList());
+                                CompletableFuture<?> lastRequest = null;
+                                int i = 0;
+                                for (ByteBuf entry : entries) {
+                                    long entryId = i++;
+                                    LOG.info("Writing {}:{} as {}",
+                                             ledgerId, entryId, 
entry.slice().readInt());
+                                    lastRequest = writer.write(entryId, entry);
+                                }
+                                lastRequest.join();
+                                return Pair.of(writer, entries);
+                            });
+                })
+            .parallel().map(CompletableFuture::join) // wait for all creations 
and adds in parallel
+            .forEach(e -> { // check that each set of adds succeeded
+                    try {
+                        WriteAdvHandle handle = e.getLeft();
+                        List<ByteBuf> entries = e.getRight();
+                        // Read and verify
+                        LOG.info("Read entries for ledger: {}", 
handle.getId());
+                        readEntries(handle, entries);
+                        entries.forEach(ByteBuf::release);
+                        handle.close();
+                        bkc.deleteLedger(handle.getId());
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        Assert.fail("Test interrupted");
+                    } catch (Exception ex) {
+                        LOG.info("Readback failed with exception", ex);
+                        Assert.fail("Readback failed " + ex.getMessage());
+                    }
+                });
     }
 
     /**
@@ -1191,6 +1215,20 @@ private void readEntries(LedgerHandle lh, List<byte[]> 
entries) throws Interrupt
         }
     }
 
+    private void readEntries(ReadHandle reader, List<ByteBuf> entries) throws 
Exception {
+        assertEquals("Not enough entries in ledger " + reader.getId(),
+                     reader.getLastAddConfirmed(), entries.size() - 1);
+        try (LedgerEntries readEntries = reader.read(0, 
reader.getLastAddConfirmed()).join()) {
+            int i = 0;
+            for (org.apache.bookkeeper.client.api.LedgerEntry e : readEntries) 
{
+                int entryId = i++;
+                ByteBuf origEntry = entries.get(entryId);
+                ByteBuf readEntry = e.getEntryBuffer();
+                assertEquals("Unexpected contents in " + reader.getId() + ":" 
+ entryId, origEntry, readEntry);
+            }
+        }
+    }
+
     private void readEntriesAndValidateDataArray(LedgerHandle lh, List<byte[]> 
entries)
             throws InterruptedException, BKException {
         ls = lh.readEntries(0, entries.size() - 1);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to