This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new eb27e6ed3c [BP-62] Add more test for Bookkeeper. (#4210)
eb27e6ed3c is described below
commit eb27e6ed3ccd1ee46f372ae6fd0a53ccea81908f
Author: Yan Zhao <[email protected]>
AuthorDate: Wed Feb 21 02:14:12 2024 +0800
[BP-62] Add more test for Bookkeeper. (#4210)
* Add test to cover Bookkeeper test.
* fix ci.
* Fix ci.
---
.../benchmark/BenchReadThroughputLatency.java | 27 ++--
.../org/apache/bookkeeper/client/LedgerHandle.java | 4 +-
.../bookkeeper/conf/ClientConfiguration.java | 5 +
.../apache/bookkeeper/client/BookKeeperTest.java | 179 +++++++++++++++++++++
4 files changed, 204 insertions(+), 11 deletions(-)
diff --git
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
index f3247ab621..5c04558b84 100644
---
a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
+++
b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
@@ -73,7 +73,7 @@ public class BenchReadThroughputLatency {
}
};
- private static void readLedger(ClientConfiguration conf, long ledgerId,
byte[] passwd) {
+ private static void readLedger(ClientConfiguration conf, long ledgerId,
byte[] passwd, int batchEntries) {
LOG.info("Reading ledger {}", ledgerId);
BookKeeper bk = null;
long time = 0;
@@ -102,17 +102,23 @@ public class BenchReadThroughputLatency {
}
long starttime = System.nanoTime();
- while (lastRead < lastConfirmed) {
+ while (entriesRead <= lastConfirmed) {
long nextLimit = lastRead + 100000;
- long readTo = Math.min(nextLimit, lastConfirmed);
- Enumeration<LedgerEntry> entries = lh.readEntries(lastRead
+ 1, readTo);
- lastRead = readTo;
+ Enumeration<LedgerEntry> entries;
+ if (batchEntries <= 0) {
+ long readTo = Math.min(nextLimit, lastConfirmed);
+ entries = lh.readEntries(lastRead + 1, readTo);
+ } else {
+ entries = lh.batchReadEntries(lastRead, batchEntries,
-1);
+ }
while (entries.hasMoreElements()) {
LedgerEntry e = entries.nextElement();
entriesRead++;
+ lastRead = e.getEntryId();
if ((entriesRead % 10000) == 0) {
- LOG.info("{} entries read", entriesRead);
+ LOG.info("{} entries read from ledger {}",
entriesRead, ledgerId);
}
+ e.getEntryBuffer().release();
}
}
long endtime = System.nanoTime();
@@ -159,6 +165,8 @@ public class BenchReadThroughputLatency {
options.addOption("sockettimeout", true, "Socket timeout for
bookkeeper client. In seconds. Default 5");
options.addOption("useV2", false, "Whether use V2 protocol to read
ledgers from the bookie server.");
options.addOption("help", false, "This message");
+ options.addOption("batchentries", true, "The batch read entries count.
"
+ + "If the value is greater than 0, uses batch read. Or uses
the single read. Default 1000");
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args);
@@ -171,6 +179,7 @@ public class BenchReadThroughputLatency {
final String servers = cmd.getOptionValue("zookeeper",
"localhost:2181");
final byte[] passwd = cmd.getOptionValue("password",
"benchPasswd").getBytes(UTF_8);
final int sockTimeout =
Integer.parseInt(cmd.getOptionValue("sockettimeout", "5"));
+ final int batchentries =
Integer.parseInt(cmd.getOptionValue("batchentries", "1000"));
if (cmd.hasOption("ledger") && cmd.hasOption("listen")) {
LOG.error("Cannot used -ledger and -listen together");
usage(options);
@@ -210,7 +219,7 @@ public class BenchReadThroughputLatency {
try {
if (event.getType() == Event.EventType.NodeCreated
&& event.getPath().equals(nodepath)) {
- readLedger(conf, ledger.get(), passwd);
+ readLedger(conf, ledger.get(), passwd,
batchentries);
shutdownLatch.countDown();
} else if (event.getType() ==
Event.EventType.NodeChildrenChanged) {
if (numLedgers.get() < 0) {
@@ -236,7 +245,7 @@ public class BenchReadThroughputLatency {
Thread t = new Thread() {
@Override
public void run() {
- readLedger(conf, ledgerId,
passwd);
+ readLedger(conf, ledgerId,
passwd, batchentries);
}
};
t.start();
@@ -259,7 +268,7 @@ public class BenchReadThroughputLatency {
if (ledger.get() != 0) {
if (zk.exists(nodepath, true) != null) {
- readLedger(conf, ledger.get(), passwd);
+ readLedger(conf, ledger.get(), passwd, batchentries);
shutdownLatch.countDown();
} else {
LOG.info("Watching for creation of" + nodepath);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index d56e10872a..a9698b6c7e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -651,7 +651,7 @@ public class LedgerHandle implements WriteHandle {
* the total entries count.
* @param maxSize
* the total entries size.
- * @see #asyncBatchReadEntries(long, int, long, boolean, ReadCallback,
Object)
+ * @see #asyncBatchReadEntries(long, int, long, ReadCallback, Object)
*/
public Enumeration<LedgerEntry> batchReadEntries(long startEntry, int
maxCount, long maxSize)
throws InterruptedException, BKException {
@@ -688,7 +688,7 @@ public class LedgerHandle implements WriteHandle {
/**
* Read a sequence of entries synchronously, allowing to read after the
LastAddConfirmed range.<br>
* This is the same of
- * {@link #asyncBatchReadUnconfirmedEntries(long, int, long, boolean,
ReadCallback, Object) }
+ * {@link #asyncBatchReadUnconfirmedEntries(long, int, long, ReadCallback,
Object) }
*
* @param firstEntry
* id of first entry of sequence (included)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 03eb6d1abd..4b6537bc62 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -2098,6 +2098,11 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
return getLong(CLIENT_CONNECT_BOOKIE_UNAVAILABLE_LOG_THROTTLING,
5_000L);
}
+ public ClientConfiguration setBatchReadEnabled(boolean enable) {
+ setProperty(BATCH_READ_ENABLED, enable);
+ return this;
+ }
+
public boolean isBatchReadEnabled() {
return getBoolean(BATCH_READ_ENABLED, true);
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 8f64c256a4..fd0a2a9b76 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -610,6 +610,185 @@ public class BookKeeperTest extends
BookKeeperClusterTestCase {
}
}
+ @Test
+ public void testBatchReadFailBackToSingleRead1() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ int numEntries = 100;
+ byte[] data = "foobar".getBytes();
+ try (BookKeeper bkc = new BookKeeper(conf)) {
+ // basic read/write
+ {
+ long ledgerId;
+ try (LedgerHandle lh = bkc.createLedger(2, 2, 2,
+ digestType, "testPasswd".getBytes())) {
+ ledgerId = lh.getId();
+ for (int i = 0; i < numEntries; i++) {
+ lh.addEntry(data);
+ }
+ }
+ try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType,
"testPasswd".getBytes())) {
+ assertEquals(numEntries - 1, lh.readLastConfirmed());
+ //V3 protocol not support batch read. In theory, it will
throw UnsupportedOperationException.
+ try {
+ lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
+ fail("Should throw UnsupportedOperationException.");
+ } catch (UnsupportedOperationException e) {
+ assertEquals("Unsupported batch read entry operation
for v3 protocol.", e.getMessage());
+ }
+ }
+ }
+ }
+
+ try (BookKeeper bkc = new BookKeeper(conf)) {
+ // basic read/write
+ {
+ long ledgerId;
+ try (LedgerHandle lh = bkc.createLedger(3, 2, 2,
+ digestType, "testPasswd".getBytes())) {
+ ledgerId = lh.getId();
+ for (int i = 0; i < numEntries; i++) {
+ lh.addEntry(data);
+ }
+ }
+ try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType,
"testPasswd".getBytes())) {
+ assertEquals(numEntries - 1, lh.readLastConfirmed());
+ //The ledger ensemble is not equals write quorum, so
failback to single read, it also can
+ //read data successfully.
+ for (Enumeration<LedgerEntry> readEntries =
lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
+ readEntries.hasMoreElements();) {
+ LedgerEntry entry = readEntries.nextElement();
+ assertArrayEquals(data, entry.getEntry());
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testBatchReadFailBackToSingleRead2() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ int numEntries = 100;
+ byte[] data = "foobar".getBytes();
+ try (BookKeeper bkc = new BookKeeper(conf)) {
+ // basic read/write
+ {
+ long ledgerId;
+ try (LedgerHandle lh = bkc.createLedger(2, 2, 2,
+ digestType, "testPasswd".getBytes())) {
+ ledgerId = lh.getId();
+ for (int i = 0; i < numEntries; i++) {
+ lh.addEntry(data);
+ }
+ }
+ try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType,
"testPasswd".getBytes())) {
+ assertEquals(numEntries - 1, lh.readLastConfirmed());
+ //V3 protocol not support batch read, it will throw
UnsupportedOperationException.
+ try {
+ lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
+ fail("Should throw UnsupportedOperationException.");
+ } catch (UnsupportedOperationException e) {
+ assertEquals("Unsupported batch read entry operation
for v3 protocol.", e.getMessage());
+ }
+ }
+ }
+ }
+
+ conf.setBatchReadEnabled(false);
+ try (BookKeeper bkc = new BookKeeper(conf)) {
+ // basic read/write
+ {
+ long ledgerId;
+ try (LedgerHandle lh = bkc.createLedger(2, 2, 2,
+ digestType, "testPasswd".getBytes())) {
+ ledgerId = lh.getId();
+ for (int i = 0; i < numEntries; i++) {
+ lh.addEntry(data);
+ }
+ }
+ try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType,
"testPasswd".getBytes())) {
+ assertEquals(numEntries - 1, lh.readLastConfirmed());
+ //We config disable the batch read, so failback to single
read, it also can
+ //read data successfully.
+ for (Enumeration<LedgerEntry> readEntries =
lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
+ readEntries.hasMoreElements();) {
+ LedgerEntry entry = readEntries.nextElement();
+ assertArrayEquals(data, entry.getEntry());
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testBatchReadWithV2Protocol() throws Exception {
+ ClientConfiguration conf = new
ClientConfiguration().setUseV2WireProtocol(true);
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ int numEntries = 100;
+ byte[] data = "foobar".getBytes();
+ try (BookKeeper bkc = new BookKeeper(conf)) {
+ // basic read/write
+ {
+ long ledgerId;
+ try (LedgerHandle lh = bkc.createLedger(2, 2, 2, digestType,
"testPasswd".getBytes())) {
+ ledgerId = lh.getId();
+ for (int i = 0; i < numEntries; i++) {
+ lh.addEntry(data);
+ }
+ }
+ try (LedgerHandle lh = bkc.openLedger(ledgerId, digestType,
"testPasswd".getBytes())) {
+ assertEquals(numEntries - 1, lh.readLastConfirmed());
+ int entries = 0;
+ for (Enumeration<LedgerEntry> readEntries =
lh.batchReadEntries(0, numEntries, 5 * 1024 * 1024);
+ readEntries.hasMoreElements();) {
+ LedgerEntry entry = readEntries.nextElement();
+ assertArrayEquals(data, entry.getEntry());
+ entries++;
+ }
+ assertEquals(numEntries, entries);
+
+ //The maxCount is 0, the result is only limited by maxSize.
+ entries = 0;
+ for (Enumeration<LedgerEntry> readEntries =
lh.batchReadEntries(0, 0, 5 * 1024 * 1024);
+ readEntries.hasMoreElements();) {
+ LedgerEntry entry = readEntries.nextElement();
+ assertArrayEquals(data, entry.getEntry());
+ entries++;
+ }
+ assertEquals(numEntries, entries);
+
+ // one entry size = 8(ledgerId) + 8(entryId) + 8(lac) +
8(length) + 8(digest) + payload size
+ long entrySize = 8 + 8 + 8 + 8 + 8 + data.length;
+ //response header size.
+ int headerSize = 24 + 8 + 4;
+ //The maxCount is 0, the result is only limited by maxSize.
+ entries = 0;
+ int expectEntriesNum = 5;
+ for (Enumeration<LedgerEntry> readEntries =
lh.batchReadEntries(0, 0,
+ expectEntriesNum * entrySize + headerSize +
(expectEntriesNum * 4));
+ readEntries.hasMoreElements();) {
+ LedgerEntry entry = readEntries.nextElement();
+ assertArrayEquals(data, entry.getEntry());
+ entries++;
+ }
+ assertEquals(expectEntriesNum, entries);
+
+ //The maxCount is 100, the result entries reach maxSize
limit.
+ entries = 0;
+ for (Enumeration<LedgerEntry> readEntries =
lh.batchReadEntries(0, 20,
+ expectEntriesNum * entrySize + headerSize +
(expectEntriesNum * 4));
+ readEntries.hasMoreElements();) {
+ LedgerEntry entry = readEntries.nextElement();
+ assertArrayEquals(data, entry.getEntry());
+ entries++;
+ }
+ assertEquals(expectEntriesNum, entries);
+ }
+ }
+ }
+ }
+
@SuppressWarnings("deprecation")
@Test
public void testReadEntryReleaseByteBufs() throws Exception {