This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 372575a [issue #3975] Bugfix NPE on non durable consumer (#3988)
372575a is described below
commit 372575a9877bf50e8f55a9568ef6c07fcae86644
Author: Ezequiel Lovelle <[email protected]>
AuthorDate: Mon Apr 8 23:01:19 2019 -0300
[issue #3975] Bugfix NPE on non durable consumer (#3988)
*Motivation*
Trying to fix #3975
When a reset of a cursor is performed with some timestamp on a non-durable
consumer the message finder will fail with null pointer exception due to
`cursor.getName()` being null.
*Modifications*
- Add method overloading for `newNonDurableCursor()` with subscription
name.
- Fix method getNonDurableSubscription to call `newNonDurableCursor()`
with
proper subscription name
- Add test to assert issue.
---
.../apache/bookkeeper/mledger/ManagedLedger.java | 1 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 ++++++++
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../apache/pulsar/client/api/TopicReaderTest.java | 28 +++++++++++++++++++++-
4 files changed, 39 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 00fc2d0..d51b0d8 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -193,6 +193,7 @@ public interface ManagedLedger {
* @return the new NonDurableCursor
*/
ManagedCursor newNonDurableCursor(Position startCursorPosition) throws
ManagedLedgerException;
+ ManagedCursor newNonDurableCursor(Position startPosition, String
subscriptionName) throws ManagedLedgerException;
/**
* Delete a ManagedCursor asynchronously.
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a398162..7a4eb60 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -845,6 +845,16 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
@Override
+ public ManagedCursor newNonDurableCursor(Position startCursorPosition,
String cursorName)
+ throws ManagedLedgerException {
+ checkManagedLedgerIsOpen();
+ checkFenced();
+
+ return new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
+ (PositionImpl) startCursorPosition);
+ }
+
+ @Override
public Iterable<ManagedCursor> getCursors() {
return cursors;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1039437..e1a0509 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -641,7 +641,7 @@ public class PersistentTopic implements Topic,
AddEntryCallback {
Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
- cursor = ledger.newNonDurableCursor(startPosition);
+ cursor = ledger.newNonDurableCursor(startPosition,
subscriptionName);
} catch (ManagedLedgerException e) {
subscriptionFuture.completeExceptionally(e);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 6318974..c1e93aa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -34,7 +34,9 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -479,9 +481,33 @@ public class TopicReaderTest extends ProducerConsumerBase {
assertTrue(reader.hasMessageAvailable());
String readOut = new String(reader.readNext().getData());
- assertTrue(readOut.equals(content));
+ assertEquals(content, readOut);
assertFalse(reader.hasMessageAvailable());
}
}
+
+ @Test(timeOut = 10000)
+ public void testReaderNonDurableIsAbleToSeekRelativeTime() throws
Exception {
+ final int numOfMessage = 10;
+ final String topicName = "persistent://my-property/my-ns/ReaderSeek";
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName).create();
+
+ for (int i = 0; i < numOfMessage; i++) {
+ producer.send(String.format("msg num %d", i).getBytes());
+ }
+
+ Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+ .startMessageId(MessageId.earliest).create();
+ assertTrue(reader.hasMessageAvailable());
+
+ ((ReaderImpl)
reader).getConsumer().seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
+
+ assertTrue(reader.hasMessageAvailable());
+
+ reader.close();
+ producer.close();
+ }
}