This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new eb27e6ed3c [BP-62] Add more test for Bookkeeper. (#4210)
eb27e6ed3c is described below

commit eb27e6ed3ccd1ee46f372ae6fd0a53ccea81908f
Author: Yan Zhao <[email protected]>
AuthorDate: Wed Feb 21 02:14:12 2024 +0800

    [BP-62] Add more test for Bookkeeper. (#4210)
    
    * Add test to cover Bookkeeper test.
    
    * fix ci.
    
    * Fix ci.
---
 .../benchmark/BenchReadThroughputLatency.java      |  27 ++--
 .../org/apache/bookkeeper/client/LedgerHandle.java |   4 +-
 .../bookkeeper/conf/ClientConfiguration.java       |   5 +
 .../apache/bookkeeper/client/BookKeeperTest.java   | 179 +++++++++++++++++++++
 4 files changed, 204 insertions(+), 11 deletions(-)

diff --git 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
index f3247ab621..5c04558b84 100644
--- 
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
+++ 
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
@@ -73,7 +73,7 @@ public class BenchReadThroughputLatency {
         }
     };
 
-    private static void readLedger(ClientConfiguration conf, long ledgerId, 
byte[] passwd) {
+    private static void readLedger(ClientConfiguration conf, long ledgerId, 
byte[] passwd, int batchEntries) {
         LOG.info("Reading ledger {}", ledgerId);
         BookKeeper bk = null;
         long time = 0;
@@ -102,17 +102,23 @@ public class BenchReadThroughputLatency {
                 }
                 long starttime = System.nanoTime();
 
-                while (lastRead < lastConfirmed) {
+                while (entriesRead <= lastConfirmed) {
                     long nextLimit = lastRead + 100000;
-                    long readTo = Math.min(nextLimit, lastConfirmed);
-                    Enumeration<LedgerEntry> entries = lh.readEntries(lastRead 
+ 1, readTo);
-                    lastRead = readTo;
+                    Enumeration<LedgerEntry> entries;
+                    if (batchEntries <= 0) {
+                        long readTo = Math.min(nextLimit, lastConfirmed);
+                        entries = lh.readEntries(lastRead + 1, readTo);
+                    } else {
+                        entries = lh.batchReadEntries(lastRead, batchEntries, 
-1);
+                    }
                     while (entries.hasMoreElements()) {
                         LedgerEntry e = entries.nextElement();
                         entriesRead++;
+                        lastRead = e.getEntryId();
                         if ((entriesRead % 10000) == 0) {
-                            LOG.info("{} entries read", entriesRead);
+                            LOG.info("{} entries read from ledger {}", 
entriesRead, ledgerId);
                         }
+                        e.getEntryBuffer().release();
                     }
                 }
                 long endtime = System.nanoTime();
@@ -159,6 +165,8 @@ public class BenchReadThroughputLatency {
         options.addOption("sockettimeout", true, "Socket timeout for 
bookkeeper client. In seconds. Default 5");
         options.addOption("useV2", false, "Whether use V2 protocol to read 
ledgers from the bookie server.");
         options.addOption("help", false, "This message");
+        options.addOption("batchentries", true, "The batch read entries count. 
"
+                + "If the value is greater than 0, uses batch read. Or uses 
the single read. Default 1000");
 
         CommandLineParser parser = new PosixParser();
         CommandLine cmd = parser.parse(options, args);
@@ -171,6 +179,7 @@ public class BenchReadThroughputLatency {
         final String servers = cmd.getOptionValue("zookeeper", 
"localhost:2181");
         final byte[] passwd = cmd.getOptionValue("password", 
"benchPasswd").getBytes(UTF_8);
         final int sockTimeout = 
Integer.parseInt(cmd.getOptionValue("sockettimeout", "5"));
+        final int batchentries = 
Integer.parseInt(cmd.getOptionValue("batchentries", "1000"));
         if (cmd.hasOption("ledger") && cmd.hasOption("listen")) {
             LOG.error("Cannot used -ledger and -listen together");
             usage(options);
@@ -210,7 +219,7 @@ public class BenchReadThroughputLatency {
                         try {
                             if (event.getType() == Event.EventType.NodeCreated
                                        && event.getPath().equals(nodepath)) {
-                                readLedger(conf, ledger.get(), passwd);
+                                readLedger(conf, ledger.get(), passwd, 
batchentries);
                                 shutdownLatch.countDown();
                             } else if (event.getType() == 
Event.EventType.NodeChildrenChanged) {
                                 if (numLedgers.get() < 0) {
@@ -236,7 +245,7 @@ public class BenchReadThroughputLatency {
                                             Thread t = new Thread() {
                                                 @Override
                                                 public void run() {
-                                                    readLedger(conf, ledgerId, 
passwd);
+                                                    readLedger(conf, ledgerId, 
passwd, batchentries);
                                                 }
                                             };
                                             t.start();
@@ -259,7 +268,7 @@ public class BenchReadThroughputLatency {
 
             if (ledger.get() != 0) {
                 if (zk.exists(nodepath, true) != null) {
-                    readLedger(conf, ledger.get(), passwd);
+                    readLedger(conf, ledger.get(), passwd, batchentries);
                     shutdownLatch.countDown();
                 } else {
                     LOG.info("Watching for creation of" + nodepath);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index d56e10872a..a9698b6c7e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -651,7 +651,7 @@ public class LedgerHandle implements WriteHandle {
      *          the total entries count.
      * @param maxSize
      *          the total entries size.
-     * @see #asyncBatchReadEntries(long, int, long, boolean, ReadCallback, 
Object)
+     * @see #asyncBatchReadEntries(long, int, long, ReadCallback, Object)
      */
     public Enumeration<LedgerEntry> batchReadEntries(long startEntry, int 
maxCount, long maxSize)
             throws InterruptedException, BKException {
@@ -688,7 +688,7 @@ public class LedgerHandle implements WriteHandle {
     /**
      * Read a sequence of entries synchronously, allowing to read after the 
LastAddConfirmed range.<br>
      * This is the same of
-     * {@link #asyncBatchReadUnconfirmedEntries(long, int, long, boolean, 
ReadCallback, Object) }
+     * {@link #asyncBatchReadUnconfirmedEntries(long, int, long, ReadCallback, 
Object) }
      *
      * @param firstEntry
      *          id of first entry of sequence (included)
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 03eb6d1abd..4b6537bc62 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -2098,6 +2098,11 @@ public class ClientConfiguration extends 
AbstractConfiguration<ClientConfigurati
         return getLong(CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING, 
5_000L);
     }
 
+    public ClientConfiguration setBatchReadEnabled(boolean enable) {
+        setProperty(BATCH_READ_ENABLED, enable);
+        return this;
+    }
+
     public boolean isBatchReadEnabled() {
         return getBoolean(BATCH_READ_ENABLED, true);
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 8f64c256a4..fd0a2a9b76 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -610,6 +610,185 @@ public class BookKeeperTest extends 
BookKeeperClusterTestCase {
         }
     }
 
+    @Test
+    public void testBatchReadFailBackToSingleRead1() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        int numEntries = 100;
+        byte[] data = "foobar".getBytes();
+        try (BookKeeper bkc = new BookKeeper(conf)) {
+            // basic read/write
+            {
+                long ledgerId;
+                try (LedgerHandle lh = bkc.createLedger(2, 2, 2,
+                        digestType, "testPasswd".getBytes())) {
+                    ledgerId = lh.getId();
+                    for (int i = 0; i < numEntries; i++) {
+                        lh.addEntry(data);
+                    }
+                }
+                try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, 
"testPasswd".getBytes())) {
+                    assertEquals(numEntries - 1, lh.readLastConfirmed());
+                    //V3 protocol not support batch read. In theory, it will 
throw UnsupportedOperationException.
+                    try {
+                        lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
+                        fail("Should throw UnsupportedOperationException.");
+                    } catch (UnsupportedOperationException e) {
+                        assertEquals("Unsupported batch read entry operation 
for v3 protocol.", e.getMessage());
+                    }
+                }
+            }
+        }
+
+        try (BookKeeper bkc = new BookKeeper(conf)) {
+            // basic read/write
+            {
+                long ledgerId;
+                try (LedgerHandle lh = bkc.createLedger(3, 2, 2,
+                        digestType, "testPasswd".getBytes())) {
+                    ledgerId = lh.getId();
+                    for (int i = 0; i < numEntries; i++) {
+                        lh.addEntry(data);
+                    }
+                }
+                try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, 
"testPasswd".getBytes())) {
+                    assertEquals(numEntries - 1, lh.readLastConfirmed());
+                    //The ledger ensemble is not equals write quorum, so 
failback to single read, it also can
+                    //read data successfully.
+                    for (Enumeration<LedgerEntry> readEntries = 
lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
+                            readEntries.hasMoreElements();) {
+                        LedgerEntry entry = readEntries.nextElement();
+                        assertArrayEquals(data, entry.getEntry());
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testBatchReadFailBackToSingleRead2() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        int numEntries = 100;
+        byte[] data = "foobar".getBytes();
+        try (BookKeeper bkc = new BookKeeper(conf)) {
+            // basic read/write
+            {
+                long ledgerId;
+                try (LedgerHandle lh = bkc.createLedger(2, 2, 2,
+                        digestType, "testPasswd".getBytes())) {
+                    ledgerId = lh.getId();
+                    for (int i = 0; i < numEntries; i++) {
+                        lh.addEntry(data);
+                    }
+                }
+                try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, 
"testPasswd".getBytes())) {
+                    assertEquals(numEntries - 1, lh.readLastConfirmed());
+                    //V3 protocol not support batch read, it will throw 
UnsupportedOperationException.
+                    try {
+                        lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
+                        fail("Should throw UnsupportedOperationException.");
+                    } catch (UnsupportedOperationException e) {
+                        assertEquals("Unsupported batch read entry operation 
for v3 protocol.", e.getMessage());
+                    }
+                }
+            }
+        }
+
+        conf.setBatchReadEnabled(false);
+        try (BookKeeper bkc = new BookKeeper(conf)) {
+            // basic read/write
+            {
+                long ledgerId;
+                try (LedgerHandle lh = bkc.createLedger(2, 2, 2,
+                        digestType, "testPasswd".getBytes())) {
+                    ledgerId = lh.getId();
+                    for (int i = 0; i < numEntries; i++) {
+                        lh.addEntry(data);
+                    }
+                }
+                try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, 
"testPasswd".getBytes())) {
+                    assertEquals(numEntries - 1, lh.readLastConfirmed());
+                    //We config disable the batch read, so failback to single 
read, it also can
+                    //read data successfully.
+                    for (Enumeration<LedgerEntry> readEntries = 
lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
+                            readEntries.hasMoreElements();) {
+                        LedgerEntry entry = readEntries.nextElement();
+                        assertArrayEquals(data, entry.getEntry());
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testBatchReadWithV2Protocol() throws Exception {
+        ClientConfiguration conf = new 
ClientConfiguration().setUseV2WireProtocol(true);
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        int numEntries = 100;
+        byte[] data = "foobar".getBytes();
+        try (BookKeeper bkc = new BookKeeper(conf)) {
+            // basic read/write
+            {
+                long ledgerId;
+                try (LedgerHandle lh = bkc.createLedger(2, 2, 2, digestType, 
"testPasswd".getBytes())) {
+                    ledgerId = lh.getId();
+                    for (int i = 0; i < numEntries; i++) {
+                        lh.addEntry(data);
+                    }
+                }
+                try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType, 
"testPasswd".getBytes())) {
+                    assertEquals(numEntries - 1, lh.readLastConfirmed());
+                    int entries = 0;
+                    for (Enumeration<LedgerEntry> readEntries = 
lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
+                            readEntries.hasMoreElements();) {
+                        LedgerEntry entry = readEntries.nextElement();
+                        assertArrayEquals(data, entry.getEntry());
+                        entries++;
+                    }
+                    assertEquals(numEntries, entries);
+
+                    //The maxCount is 0, the result is only limited by maxSize.
+                    entries = 0;
+                    for (Enumeration<LedgerEntry> readEntries = 
lh.batchReadEntries(0, 0, 5 * 1024 * 1024);
+                            readEntries.hasMoreElements();) {
+                        LedgerEntry entry = readEntries.nextElement();
+                        assertArrayEquals(data, entry.getEntry());
+                        entries++;
+                    }
+                    assertEquals(numEntries, entries);
+
+                    // one entry size = 8(ledgerId) + 8(entryId) + 8(lac) + 
8(length) + 8(digest) + payload size
+                    long entrySize = 8 + 8 + 8 + 8 + 8 + data.length;
+                    //response header size.
+                    int headerSize = 24 + 8 + 4;
+                    //The maxCount is 0, the result is only limited by maxSize.
+                    entries = 0;
+                    int expectEntriesNum = 5;
+                    for (Enumeration<LedgerEntry> readEntries = 
lh.batchReadEntries(0, 0,
+                            expectEntriesNum * entrySize + headerSize + 
(expectEntriesNum * 4));
+                            readEntries.hasMoreElements();) {
+                        LedgerEntry entry = readEntries.nextElement();
+                        assertArrayEquals(data, entry.getEntry());
+                        entries++;
+                    }
+                    assertEquals(expectEntriesNum, entries);
+
+                    //The maxCount is 100, the result entries reach maxSize 
limit.
+                    entries = 0;
+                    for (Enumeration<LedgerEntry> readEntries = 
lh.batchReadEntries(0, 20,
+                            expectEntriesNum * entrySize + headerSize + 
(expectEntriesNum * 4));
+                            readEntries.hasMoreElements();) {
+                        LedgerEntry entry = readEntries.nextElement();
+                        assertArrayEquals(data, entry.getEntry());
+                        entries++;
+                    }
+                    assertEquals(expectEntriesNum, entries);
+                }
+            }
+        }
+    }
+
     @SuppressWarnings("deprecation")
     @Test
     public void testReadEntryReleaseByteBufs() throws Exception {

Reply via email to