sijie closed pull request #1389: Add smoke tests for LedgerHandleAdv and
TailingReads
URL: https://github.com/apache/bookkeeper/pull/1389
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/pom.xml b/pom.xml
index 0fef32aa5..ea2e57098 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<javac.target>1.8</javac.target>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
<!-- dependencies -->
<arquillian-cube.version>1.15.1</arquillian-cube.version>
<arquillian-junit.version>1.1.14.Final</arquillian-junit.version>
diff --git a/tests/integration/smoke/pom.xml b/tests/integration/smoke/pom.xml
index f7442dd8b..591b7404d 100644
--- a/tests/integration/smoke/pom.xml
+++ b/tests/integration/smoke/pom.xml
@@ -45,6 +45,7 @@
<configuration>
<!-- smoke test should never flake //-->
<rerunFailingTestsCount>0</rerunFailingTestsCount>
+
<redirectTestOutputToFile>${redirectTestOutputToFile}</redirectTestOutputToFile>
</configuration>
</plugin>
</plugins>
diff --git
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
index 6a80c0971..1ab0aff07 100644
---
a/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
+++
b/tests/integration/smoke/src/test/java/org/apache/bookkeeper/tests/integration/TestSmoke.java
@@ -17,29 +17,36 @@
*/
package org.apache.bookkeeper.tests.integration;
-import com.github.dockerjava.api.DockerClient;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import com.github.dockerjava.api.DockerClient;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Enumeration;
-
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.tests.BookKeeperClusterUtils;
-
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.arquillian.test.api.ArquillianResource;
-
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+@Slf4j
@RunWith(Arquillian.class)
public class TestSmoke {
- private static final Logger LOG = LoggerFactory.getLogger(TestSmoke.class);
private static byte[] PASSWD = "foobar".getBytes();
@ArquillianResource
@@ -48,42 +55,143 @@
private String currentVersion = System.getProperty("currentVersion");
@Before
- public void before() throws Exception {
+ public void setup() throws Exception {
// First test to run, formats metadata and bookies
if (BookKeeperClusterUtils.metadataFormatIfNeeded(docker,
currentVersion)) {
BookKeeperClusterUtils.formatAllBookies(docker, currentVersion);
}
+
Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker,
currentVersion));
}
- @Test
- public void testBootWriteReadShutdown() throws Exception {
-
Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker,
currentVersion));
+ @After
+ public void teardown() throws Exception {
+ Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker));
+ }
+ @Test
+ public void testReadWrite() throws Exception {
String zookeeper =
BookKeeperClusterUtils.zookeeperConnectString(docker);
- BookKeeper bk = new BookKeeper(zookeeper);
- long ledgerId;
- try (LedgerHandle writelh =
bk.createLedger(BookKeeper.DigestType.CRC32, PASSWD)) {
- ledgerId = writelh.getId();
- for (int i = 0; i < 100; i++) {
- writelh.addEntry(("entry-" + i).getBytes());
+ try (BookKeeper bk = new BookKeeper(zookeeper)) {
+ long ledgerId;
+ try (LedgerHandle writelh =
bk.createLedger(BookKeeper.DigestType.CRC32C, PASSWD)) {
+ ledgerId = writelh.getId();
+ for (int i = 0; i < 100; i++) {
+ writelh.addEntry(("entry-" + i).getBytes());
+ }
+ }
+
+ try (LedgerHandle readlh = bk.openLedger(ledgerId,
BookKeeper.DigestType.CRC32C, PASSWD)) {
+ long lac = readlh.getLastAddConfirmed();
+ int i = 0;
+ Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
+ while (entries.hasMoreElements()) {
+ LedgerEntry e = entries.nextElement();
+ String readBack = new String(e.getEntry());
+ assertEquals(readBack, "entry-" + i++);
+ }
+ assertEquals(i, 100);
}
}
+ }
- try (LedgerHandle readlh = bk.openLedger(ledgerId,
BookKeeper.DigestType.CRC32, PASSWD)) {
- long lac = readlh.getLastAddConfirmed();
- int i = 0;
- Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
- while (entries.hasMoreElements()) {
- LedgerEntry e = entries.nextElement();
- String readBack = new String(e.getEntry());
- Assert.assertEquals(readBack, "entry-" + i++);
+ @Test
+ public void testReadWriteAdv() throws Exception {
+ String zookeeper =
BookKeeperClusterUtils.zookeeperConnectString(docker);
+ try (BookKeeper bk = new BookKeeper(zookeeper)) {
+ long ledgerId;
+ try (LedgerHandle writelh = bk.createLedgerAdv(3, 3, 2,
BookKeeper.DigestType.CRC32C, PASSWD)) {
+ ledgerId = writelh.getId();
+ for (int i = 0; i < 100; i++) {
+ writelh.addEntry(i, ("entry-" + i).getBytes());
+ }
+ }
+
+ try (LedgerHandle readlh = bk.openLedger(ledgerId,
BookKeeper.DigestType.CRC32C, PASSWD)) {
+ long lac = readlh.getLastAddConfirmed();
+ int i = 0;
+ Enumeration<LedgerEntry> entries = readlh.readEntries(0, lac);
+ while (entries.hasMoreElements()) {
+ LedgerEntry e = entries.nextElement();
+ String readBack = new String(e.getEntry());
+ assertEquals(readBack, "entry-" + i++);
+ }
+ assertEquals(i, 100);
}
- Assert.assertEquals(i, 100);
}
+ }
- bk.close();
+ @Test
+ public void testTailingReads() throws Exception {
+ String zookeeper =
BookKeeperClusterUtils.zookeeperConnectString(docker);
+ @Cleanup BookKeeper bk = new BookKeeper(zookeeper);
+ @Cleanup LedgerHandle writeLh = bk.createLedger(DigestType.CRC32C,
PASSWD);
+ @Cleanup("shutdown") ExecutorService writeExecutor =
Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("write-executor").build());
+
+ @Cleanup LedgerHandle readLh =
bk.openLedgerNoRecovery(writeLh.getId(), DigestType.CRC32C, PASSWD);
+ @Cleanup("shutdown") ExecutorService readExecutor =
Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("read-executor").build());
+
+ int numEntries = 100;
+ CompletableFuture<Void> readFuture = new CompletableFuture<>();
+ CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+
+ // start the read thread
+ readExecutor.submit(() -> {
+ long lastExpectedConfirmedEntryId = numEntries - 2;
+ long nextEntryId = 0L;
+ try {
+ while (nextEntryId <= lastExpectedConfirmedEntryId) {
+ long lac = readLh.getLastAddConfirmed();
+ while (lac >= nextEntryId) {
+ Enumeration<LedgerEntry> entries =
readLh.readEntries(nextEntryId, lac);
+ while (entries.hasMoreElements()) {
+ LedgerEntry e = entries.nextElement();
+ String readBack = new String(e.getEntry(), UTF_8);
+ log.info("Read entry {} : {}", e.getEntryId(),
readBack);
+ assertEquals(readBack, "entry-" + (nextEntryId++));
+ }
+ assertEquals(lac + 1, nextEntryId);
+ }
+
+ if (nextEntryId >= lastExpectedConfirmedEntryId) {
+ break;
+ }
+
+ // refresh lac
+ while (readLh.readLastConfirmed() < nextEntryId) {
+ TimeUnit.MILLISECONDS.sleep(100L);
+ }
+ }
+ FutureUtils.complete(readFuture, null);
+ log.info("Completed tailing read ledger {}", writeLh.getId());
+ } catch (Exception e) {
+ log.error("Exceptions thrown during tailing read ledger {}",
writeLh.getId(), e);
+ readFuture.completeExceptionally(e);
+ }
+ });
+
+ // start the write thread
+ writeExecutor.submit(() -> {
+ try {
+ for (int i = 0; i < 100; i++) {
+ writeLh.addEntry(("entry-" + i).getBytes());
+ }
+ log.info("Completed writing {} entries to ledger {}",
numEntries, writeLh.getId());
+ FutureUtils.complete(writeFuture, null);
+ } catch (Exception e) {
+ log.error("Exceptions thrown during writing {} entries to
ledger {}", numEntries, writeLh.getId(), e);
+ writeFuture.completeExceptionally(e);
+ }
+ });
- Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker));
+ // both write and read should be successful
+ result(readFuture);
+ result(writeFuture);
+
+ assertEquals(readLh.getLastAddConfirmed(), numEntries - 2);
+ assertEquals(writeLh.getLastAddConfirmed(), numEntries - 1);
+ assertEquals(writeLh.getLastAddPushed(), numEntries - 1);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services