This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d93236  Reduce running time for testLedgerCreateAdvWithLedgerIdInLoop
7d93236 is described below

commit 7d932365eb942e0dde03d4bd73ee4500576afd0d
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu Mar 8 22:19:39 2018 -0800

    Reduce running time for testLedgerCreateAdvWithLedgerIdInLoop
    
    This test has flaked in CI regularly mainly because it takes a long
    time to run.
    
    This changes reduces the running time by.
    1. Creating the ledgers with a ensemble size of 1
    2. Creating and writing to the ledgers in parallel
    3. Using async methods to write entries to the ledgers
    
    This brought the test from ~60 seconds to ~7 seconds locally.
    
    Author: Ivan Kelly <iv...@apache.org>
    
    Reviewers: Charan Reddy Guttapalem <reddychara...@gmail.com>, Sijie Guo 
<si...@apache.org>
    
    This closes #1233 from ivankelly/ledger-create-adv-timeout
---
 .../bookkeeper/client/BookieWriteLedgerTest.java   | 114 ++++++++++++++-------
 1 file changed, 76 insertions(+), 38 deletions(-)

diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index cab3f8a..9a63e70 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -42,16 +42,21 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException.BKLedgerClosedException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteAdvHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 import org.junit.Before;
@@ -95,7 +100,7 @@ public class BookieWriteLedgerTest extends
     @Before
     public void setUp() throws Exception {
         super.setUp();
-        rng = new Random(System.currentTimeMillis()); // Initialize the Random
+        rng = new Random(0); // Initialize the Random
         // Number Generator
         entries1 = new ArrayList<byte[]>(); // initialize the entries list
         entries2 = new ArrayList<byte[]>(); // initialize the entries list
@@ -570,7 +575,6 @@ public class BookieWriteLedgerTest extends
         bkc.deleteLedger(ledgerId);
     }
 
-
     /**
      * In a loop create/write/delete the ledger with same ledgerId through
      * the functionality of Advanced Ledger which accepts ledgerId as input.
@@ -579,45 +583,65 @@ public class BookieWriteLedgerTest extends
      */
     @Test
     public void testLedgerCreateAdvWithLedgerIdInLoop() throws Exception {
-        long ledgerId;
         int ledgerCount = 40;
 
-        List<List<byte[]>> entryList = new ArrayList<List<byte[]>>();
-        LedgerHandle[] lhArray = new LedgerHandle[ledgerCount];
-
-        List<byte[]> tmpEntry;
-        for (int lc = 0; lc < ledgerCount; lc++) {
-            tmpEntry = new ArrayList<byte[]>();
-
-            ledgerId = rng.nextLong();
-            ledgerId &= Long.MAX_VALUE;
-            if 
(!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
 {
-                // since LongHierarchicalLedgerManager supports ledgerIds of 
decimal length upto 19 digits but other
-                // LedgerManagers only upto 10 decimals
-                ledgerId %= 9999999999L;
-            }
-
-            LOG.info("Iteration: {}  LedgerId: {}", lc, ledgerId);
-            lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, 
ledgerPassword, null);
-            lhArray[lc] = lh;
-
-            for (int i = 0; i < numEntriesToWrite; i++) {
-                ByteBuffer entry = ByteBuffer.allocate(4);
-                entry.putInt(rng.nextInt(maxInt));
-                entry.position(0);
-                tmpEntry.add(entry.array());
-                lh.addEntry(i, entry.array());
-            }
-            entryList.add(tmpEntry);
-        }
-        for (int lc = 0; lc < ledgerCount; lc++) {
-            // Read and verify
-            long lid = lhArray[lc].getId();
-            LOG.info("readEntries for lc: {} ledgerId: {} ", lc, 
lhArray[lc].getId());
-            readEntries(lhArray[lc], entryList.get(lc));
-            lhArray[lc].close();
-            bkc.deleteLedger(lid);
+        long maxId = 9999999999L;
+        if 
(baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class))
 {
+            // since LongHierarchicalLedgerManager supports ledgerIds of 
decimal length upto 19 digits but other
+            // LedgerManagers only upto 10 decimals
+            maxId = Long.MAX_VALUE;
         }
+
+        rng.longs(ledgerCount, 0, maxId) // generate a stream of ledger ids
+            .mapToObj(ledgerId -> { // create a ledger for each ledger id
+                    LOG.info("Creating adv ledger with id {}", ledgerId);
+                    return bkc.newCreateLedgerOp()
+                        
.withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1)
+                        
.withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32)
+                        
.withPassword(ledgerPassword).makeAdv().withLedgerId(ledgerId)
+                        .execute()
+                        .thenApply(writer -> { // Add entries to ledger when 
created
+                                LOG.info("Writing stream of {} entries to {}",
+                                         numEntriesToWrite, ledgerId);
+                                List<ByteBuf> entries = 
rng.ints(numEntriesToWrite, 0, maxInt)
+                                    .mapToObj(i -> {
+                                            ByteBuf entry = Unpooled.buffer(4);
+                                            entry.retain();
+                                            entry.writeInt(i);
+                                            return entry;
+                                        })
+                                    .collect(Collectors.toList());
+                                CompletableFuture<?> lastRequest = null;
+                                int i = 0;
+                                for (ByteBuf entry : entries) {
+                                    long entryId = i++;
+                                    LOG.info("Writing {}:{} as {}",
+                                             ledgerId, entryId, 
entry.slice().readInt());
+                                    lastRequest = writer.write(entryId, entry);
+                                }
+                                lastRequest.join();
+                                return Pair.of(writer, entries);
+                            });
+                })
+            .parallel().map(CompletableFuture::join) // wait for all creations 
and adds in parallel
+            .forEach(e -> { // check that each set of adds succeeded
+                    try {
+                        WriteAdvHandle handle = e.getLeft();
+                        List<ByteBuf> entries = e.getRight();
+                        // Read and verify
+                        LOG.info("Read entries for ledger: {}", 
handle.getId());
+                        readEntries(handle, entries);
+                        entries.forEach(ByteBuf::release);
+                        handle.close();
+                        bkc.deleteLedger(handle.getId());
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        Assert.fail("Test interrupted");
+                    } catch (Exception ex) {
+                        LOG.info("Readback failed with exception", ex);
+                        Assert.fail("Readback failed " + ex.getMessage());
+                    }
+                });
     }
 
     /**
@@ -1191,6 +1215,20 @@ public class BookieWriteLedgerTest extends
         }
     }
 
+    private void readEntries(ReadHandle reader, List<ByteBuf> entries) throws 
Exception {
+        assertEquals("Not enough entries in ledger " + reader.getId(),
+                     reader.getLastAddConfirmed(), entries.size() - 1);
+        try (LedgerEntries readEntries = reader.read(0, 
reader.getLastAddConfirmed()).join()) {
+            int i = 0;
+            for (org.apache.bookkeeper.client.api.LedgerEntry e : readEntries) 
{
+                int entryId = i++;
+                ByteBuf origEntry = entries.get(entryId);
+                ByteBuf readEntry = e.getEntryBuffer();
+                assertEquals("Unexpected contents in " + reader.getId() + ":" 
+ entryId, origEntry, readEntry);
+            }
+        }
+    }
+
     private void readEntriesAndValidateDataArray(LedgerHandle lh, List<byte[]> 
entries)
             throws InterruptedException, BKException {
         ls = lh.readEntries(0, entries.size() - 1);

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to