This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9573051 Improve tests for ReaderImpl and ManagedLedgerImpl (#4122)
9573051 is described below
commit 95730510d03aa60bf33baa41fedc80ec6cc60e02
Author: Ezequiel Lovelle <[email protected]>
AuthorDate: Wed Apr 24 17:38:26 2019 -0300
Improve tests for ReaderImpl and ManagedLedgerImpl (#4122)
- Improve timing tests replacing *before* and *after* class annotation
instead
of per method in TopicReaderTest. In this case there isn't any real
need to
setup the entire test environment for each test.
- Improve tests from latest changes in Reader in order to use
testMessageOrderAndDuplicates method.
- Prevents non-durable cursor being built with null cursor name.
- Add tests to assert cache on non-durable cursor implementation.
- Fix missed close() method from TopicReaderTest.
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 ++-
.../mledger/impl/NonDurableCursorTest.java | 46 ++++++++++++++++++++++
.../apache/pulsar/client/api/TopicReaderTest.java | 42 ++++++++++++++------
3 files changed, 80 insertions(+), 13 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c777b60..4f72ba3 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -32,6 +32,7 @@ import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -826,12 +827,14 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
checkManagedLedgerIsOpen();
checkFenced();
- return new NonDurableCursorImpl(bookKeeper, config, this, null,
(PositionImpl) startCursorPosition);
+ return new NonDurableCursorImpl(bookKeeper, config, this, null,
+ (PositionImpl) startCursorPosition);
}
@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition,
String cursorName)
throws ManagedLedgerException {
+ Objects.requireNonNull(cursorName, "cursor name can't be null");
checkManagedLedgerIsOpen();
checkFenced();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index ff99da3..8e36532 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -576,5 +576,51 @@ public class NonDurableCursorTest extends
MockedBookKeeperTestCase {
assertEquals(c2.getNumberOfEntriesInBacklog(), 5);
}
+ @Test
+ void testCursorWithNameIsCachable() throws Exception {
+ final String p1CursorName = "entry-1";
+ final String p2CursorName = "entry-2";
+ ManagedLedger ledger = factory.open("my_test_ledger", new
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+
+ Position p1 = ledger.addEntry(p1CursorName.getBytes());
+ Position p2 = ledger.addEntry(p2CursorName.getBytes());
+
+ ManagedCursor c1 = ledger.newNonDurableCursor(p1, p1CursorName);
+ ManagedCursor c2 = ledger.newNonDurableCursor(p1, p1CursorName);
+ ManagedCursor c3 = ledger.newNonDurableCursor(p2, p2CursorName);
+ ManagedCursor c4 = ledger.newNonDurableCursor(p2, p2CursorName);
+
+ assertEquals(c1, c2);
+ assertEquals(c3, c4);
+
+ assertNotEquals(c1, c3);
+ assertNotEquals(c2, c3);
+ assertNotEquals(c1, c4);
+ assertNotEquals(c2, c4);
+
+ assertNotNull(c1.getName());
+ assertNotNull(c2.getName());
+ assertNotNull(c3.getName());
+ assertNotNull(c4.getName());
+ ledger.close();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ void testCursorWithNameIsNotNull() throws Exception {
+ final String p1CursorName = "entry-1";
+ ManagedLedger ledger = factory.open("my_test_ledger", new
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+
+ Position p1 = ledger.addEntry(p1CursorName.getBytes());
+
+ try {
+ ledger.newNonDurableCursor(p1, null);
+ } catch (NullPointerException npe) {
+ assertEquals(npe.getMessage(), "cursor name can't be null");
+ throw npe;
+ } finally {
+ ledger.close();
+ }
+ }
+
private static final Logger log =
LoggerFactory.getLogger(NonDurableCursorTest.class);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index baf3d13..4bfa589 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -40,21 +40,21 @@ import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TopicReaderTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(TopicReaderTest.class);
- @BeforeMethod
+ @BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}
- @AfterMethod
+ @AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
@@ -407,6 +407,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
// readNext should return null, after reach the end of topic.
assertNull(reader.readNext(1, TimeUnit.SECONDS));
+ reader.close();
producer.close();
}
@@ -448,6 +449,8 @@ public class TopicReaderTest extends ProducerConsumerBase {
}
assertFalse(reader.hasMessageAvailable());
+
+ reader.close();
producer.close();
}
@@ -487,10 +490,10 @@ public class TopicReaderTest extends ProducerConsumerBase
{
}
- @Test(timeOut = 10000)
+ @Test
public void testReaderNonDurableIsAbleToSeekRelativeTime() throws
Exception {
final int numOfMessage = 10;
- final String topicName = "persistent://my-property/my-ns/ReaderSeek";
+ final String topicName =
"persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName).create();
@@ -503,7 +506,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
.startMessageId(MessageId.earliest).create();
assertTrue(reader.hasMessageAvailable());
- ((ReaderImpl)
reader).getConsumer().seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
+ reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
assertTrue(reader.hasMessageAvailable());
@@ -529,9 +532,12 @@ public class TopicReaderTest extends ProducerConsumerBase {
assertTrue(reader.hasMessageAvailable());
// Read all messages the first time
+ Set<String> messageSetA = Sets.newHashSet();
for (int i = 0; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
- Assert.assertEquals(message.getData(), String.format("msg num %d",
i).getBytes());
+ String receivedMessage = new String(message.getData());
+ String expectedMessage = String.format("msg num %d", i);
+ testMessageOrderAndDuplicates(messageSetA, receivedMessage,
expectedMessage);
}
assertFalse(reader.hasMessageAvailable());
@@ -540,9 +546,12 @@ public class TopicReaderTest extends ProducerConsumerBase {
reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
// Read all messages a second time after seek()
+ Set<String> messageSetB = Sets.newHashSet();
for (int i = 0; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
- Assert.assertEquals(message.getData(), String.format("msg num %d",
i).getBytes());
+ String receivedMessage = new String(message.getData());
+ String expectedMessage = String.format("msg num %d", i);
+ testMessageOrderAndDuplicates(messageSetB, receivedMessage,
expectedMessage);
}
// Reader should be finished
@@ -574,9 +583,12 @@ public class TopicReaderTest extends ProducerConsumerBase {
// Read all messages the first time
MessageId midmessageToSeek = null;
+ Set<String> messageSetA = Sets.newHashSet();
for (int i = 0; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
- Assert.assertEquals(message.getData(), String.format("msg num %d",
i).getBytes());
+ String receivedMessage = new String(message.getData());
+ String expectedMessage = String.format("msg num %d", i);
+ testMessageOrderAndDuplicates(messageSetA, receivedMessage,
expectedMessage);
if (i == halfMessages) {
midmessageToSeek = message.getMessageId();
@@ -589,9 +601,12 @@ public class TopicReaderTest extends ProducerConsumerBase {
reader.seek(midmessageToSeek);
// Read all halved messages after seek()
+ Set<String> messageSetB = Sets.newHashSet();
for (int i = halfMessages + 1; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
- Assert.assertEquals(message.getData(), String.format("msg num %d",
i).getBytes());
+ String receivedMessage = new String(message.getData());
+ String expectedMessage = String.format("msg num %d", i);
+ testMessageOrderAndDuplicates(messageSetB, receivedMessage,
expectedMessage);
}
// Reader should be finished
@@ -624,9 +639,12 @@ public class TopicReaderTest extends ProducerConsumerBase {
int plusTime = (halfMessages + 1) * 100;
reader.seek(l + plusTime);
+ Set<String> messageSet = Sets.newHashSet();
for (int i = halfMessages; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
- Assert.assertEquals(message.getData(), String.format("msg num %d",
i).getBytes());
+ String receivedMessage = new String(message.getData());
+ String expectedMessage = String.format("msg num %d", i);
+ testMessageOrderAndDuplicates(messageSet, receivedMessage,
expectedMessage);
}
reader.close();