This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9573051  Improve tests for ReaderImpl and ManagedLedgerImpl (#4122)
9573051 is described below

commit 95730510d03aa60bf33baa41fedc80ec6cc60e02
Author: Ezequiel Lovelle <[email protected]>
AuthorDate: Wed Apr 24 17:38:26 2019 -0300

    Improve tests for ReaderImpl and ManagedLedgerImpl (#4122)
    
    - Improve timing tests replacing *before* and *after* class annotation 
instead
        of per method in TopicReaderTest. In this case there isn't any real 
need to
        setup the entire test environment for each test.
      - Improve tests from latest changes in Reader in order to use
        testMessageOrderAndDuplicates method.
      - Prevents non-durable cursor being built with null cursor name.
      - Add tests to assert cache on non-durable cursor implementation.
      - Fix missed close() method from TopicReaderTest.
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  5 ++-
 .../mledger/impl/NonDurableCursorTest.java         | 46 ++++++++++++++++++++++
 .../apache/pulsar/client/api/TopicReaderTest.java  | 42 ++++++++++++++------
 3 files changed, 80 insertions(+), 13 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c777b60..4f72ba3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -32,6 +32,7 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
 import java.util.UUID;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -826,12 +827,14 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         checkManagedLedgerIsOpen();
         checkFenced();
 
-        return new NonDurableCursorImpl(bookKeeper, config, this, null, 
(PositionImpl) startCursorPosition);
+        return new NonDurableCursorImpl(bookKeeper, config, this, null,
+                (PositionImpl) startCursorPosition);
     }
 
     @Override
     public ManagedCursor newNonDurableCursor(Position startCursorPosition, 
String cursorName)
             throws ManagedLedgerException {
+        Objects.requireNonNull(cursorName, "cursor name can't be null");
         checkManagedLedgerIsOpen();
         checkFenced();
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index ff99da3..8e36532 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -576,5 +576,51 @@ public class NonDurableCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(c2.getNumberOfEntriesInBacklog(), 5);
     }
 
+    @Test
+    void testCursorWithNameIsCachable() throws Exception {
+        final String p1CursorName = "entry-1";
+        final String p2CursorName = "entry-2";
+        ManagedLedger ledger = factory.open("my_test_ledger", new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+
+        Position p1 = ledger.addEntry(p1CursorName.getBytes());
+        Position p2 = ledger.addEntry(p2CursorName.getBytes());
+
+        ManagedCursor c1 = ledger.newNonDurableCursor(p1, p1CursorName);
+        ManagedCursor c2 = ledger.newNonDurableCursor(p1, p1CursorName);
+        ManagedCursor c3 = ledger.newNonDurableCursor(p2, p2CursorName);
+        ManagedCursor c4 = ledger.newNonDurableCursor(p2, p2CursorName);
+
+        assertEquals(c1, c2);
+        assertEquals(c3, c4);
+
+        assertNotEquals(c1, c3);
+        assertNotEquals(c2, c3);
+        assertNotEquals(c1, c4);
+        assertNotEquals(c2, c4);
+
+        assertNotNull(c1.getName());
+        assertNotNull(c2.getName());
+        assertNotNull(c3.getName());
+        assertNotNull(c4.getName());
+        ledger.close();
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    void testCursorWithNameIsNotNull() throws Exception {
+        final String p1CursorName = "entry-1";
+        ManagedLedger ledger = factory.open("my_test_ledger", new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+
+        Position p1 = ledger.addEntry(p1CursorName.getBytes());
+
+        try {
+            ledger.newNonDurableCursor(p1, null);
+        } catch (NullPointerException npe) {
+            assertEquals(npe.getMessage(), "cursor name can't be null");
+            throw npe;
+        } finally {
+            ledger.close();
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(NonDurableCursorTest.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index baf3d13..4bfa589 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -40,21 +40,21 @@ import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TopicReaderTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(TopicReaderTest.class);
 
-    @BeforeMethod
+    @BeforeClass
     @Override
     protected void setup() throws Exception {
         super.internalSetup();
         super.producerBaseSetup();
     }
 
-    @AfterMethod
+    @AfterClass
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
@@ -407,6 +407,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
         // readNext should return null, after reach the end of topic.
         assertNull(reader.readNext(1, TimeUnit.SECONDS));
 
+        reader.close();
         producer.close();
     }
 
@@ -448,6 +449,8 @@ public class TopicReaderTest extends ProducerConsumerBase {
         }
 
         assertFalse(reader.hasMessageAvailable());
+
+        reader.close();
         producer.close();
     }
 
@@ -487,10 +490,10 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     }
 
-    @Test(timeOut = 10000)
+    @Test
     public void testReaderNonDurableIsAbleToSeekRelativeTime() throws 
Exception {
         final int numOfMessage = 10;
-        final String topicName = "persistent://my-property/my-ns/ReaderSeek";
+        final String topicName = 
"persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime";
 
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topicName).create();
@@ -503,7 +506,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
                 .startMessageId(MessageId.earliest).create();
         assertTrue(reader.hasMessageAvailable());
 
-        ((ReaderImpl) 
reader).getConsumer().seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
+        reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
 
         assertTrue(reader.hasMessageAvailable());
 
@@ -529,9 +532,12 @@ public class TopicReaderTest extends ProducerConsumerBase {
         assertTrue(reader.hasMessageAvailable());
 
         // Read all messages the first time
+        Set<String> messageSetA = Sets.newHashSet();
         for (int i = 0; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
-            Assert.assertEquals(message.getData(), String.format("msg num %d", 
i).getBytes());
+            String receivedMessage = new String(message.getData());
+            String expectedMessage = String.format("msg num %d", i);
+            testMessageOrderAndDuplicates(messageSetA, receivedMessage, 
expectedMessage);
         }
 
         assertFalse(reader.hasMessageAvailable());
@@ -540,9 +546,12 @@ public class TopicReaderTest extends ProducerConsumerBase {
         reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
 
         // Read all messages a second time after seek()
+        Set<String> messageSetB = Sets.newHashSet();
         for (int i = 0; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
-            Assert.assertEquals(message.getData(), String.format("msg num %d", 
i).getBytes());
+            String receivedMessage = new String(message.getData());
+            String expectedMessage = String.format("msg num %d", i);
+            testMessageOrderAndDuplicates(messageSetB, receivedMessage, 
expectedMessage);
         }
 
         // Reader should be finished
@@ -574,9 +583,12 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
         // Read all messages the first time
         MessageId midmessageToSeek = null;
+        Set<String> messageSetA = Sets.newHashSet();
         for (int i = 0; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
-            Assert.assertEquals(message.getData(), String.format("msg num %d", 
i).getBytes());
+            String receivedMessage = new String(message.getData());
+            String expectedMessage = String.format("msg num %d", i);
+            testMessageOrderAndDuplicates(messageSetA, receivedMessage, 
expectedMessage);
 
             if (i == halfMessages) {
                 midmessageToSeek = message.getMessageId();
@@ -589,9 +601,12 @@ public class TopicReaderTest extends ProducerConsumerBase {
         reader.seek(midmessageToSeek);
 
         // Read all halved messages after seek()
+        Set<String> messageSetB = Sets.newHashSet();
         for (int i = halfMessages + 1; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
-            Assert.assertEquals(message.getData(), String.format("msg num %d", 
i).getBytes());
+            String receivedMessage = new String(message.getData());
+            String expectedMessage = String.format("msg num %d", i);
+            testMessageOrderAndDuplicates(messageSetB, receivedMessage, 
expectedMessage);
         }
 
         // Reader should be finished
@@ -624,9 +639,12 @@ public class TopicReaderTest extends ProducerConsumerBase {
         int plusTime = (halfMessages + 1) * 100;
         reader.seek(l + plusTime);
 
+        Set<String> messageSet = Sets.newHashSet();
         for (int i = halfMessages; i < numOfMessage; i++) {
             Message<byte[]> message = reader.readNext();
-            Assert.assertEquals(message.getData(), String.format("msg num %d", 
i).getBytes());
+            String receivedMessage = new String(message.getData());
+            String expectedMessage = String.format("msg num %d", i);
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
         }
 
         reader.close();

Reply via email to