This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2de50a7  Issue #1517: make getLastConfirmedEntry in ManagedLedgerImpl 
return real LAC (#1550)
2de50a7 is described below

commit 2de50a762628647fcf1b7873b325fb1103c2b198
Author: Jia Zhai <zhaiji...@gmail.com>
AuthorDate: Thu Apr 12 10:26:45 2018 -0700

    Issue #1517: make getLastConfirmedEntry in ManagedLedgerImpl return real 
LAC (#1550)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 12 ++++
 .../apache/pulsar/client/api/TopicReaderTest.java  | 72 ++++++++++++++++------
 2 files changed, 66 insertions(+), 18 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8649d1d..c0201dc 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -359,7 +359,19 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                         STATE_UPDATER.set(this, State.LedgerOpened);
                         lastLedgerCreatedTimestamp = 
System.currentTimeMillis();
                         currentLedger = lh;
+
                         lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
+                        // bypass empty ledgers, find last ledger with Message 
if possible.
+                        while (lastConfirmedEntry.getEntryId() == -1) {
+                            Map.Entry<Long, LedgerInfo> formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+                            if (formerLedger != null) {
+                                LedgerInfo ledgerInfo = 
formerLedger.getValue();
+                                lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+                            } else {
+                                break;
+                            }
+                        }
+
                         LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
                         ledgers.put(lh.getId(), info);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 12415db..9891d0b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -60,10 +60,10 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testSimpleReader() throws Exception {
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReader")
                 .startMessageId(MessageId.earliest).create();
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReader")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -88,14 +88,14 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testReaderAfterMessagesWerePublished() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
@@ -116,17 +116,17 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testMultipleReaders() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testMultipleReaders")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Reader<byte[]> reader1 = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader1 = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMultipleReaders")
                 .startMessageId(MessageId.earliest).create();
 
-        Reader<byte[]> reader2 = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader2 = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMultipleReaders")
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
@@ -157,7 +157,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testTopicStats() throws Exception {
-        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+        String topicName = "persistent://my-property/use/my-ns/testTopicStats";
 
         Reader<byte[]> reader1 = 
pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
 
@@ -178,14 +178,14 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
 
     @Test
     public void testReaderOnLastMessage() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderOnLastMessage")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderOnLastMessage")
                 .startMessageId(MessageId.latest).create();
 
         for (int i = 10; i < 20; i++) {
@@ -213,7 +213,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
 
     @Test
     public void testReaderOnSpecificMessage() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessage")
                 .create();
         List<MessageId> messageIds = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
@@ -221,7 +221,7 @@ public class TopicReaderTest extends ProducerConsumerBase {
             messageIds.add(producer.send(message.getBytes()));
         }
 
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessage")
                 .startMessageId(messageIds.get(4)).create();
 
         // Publish more messages and verify the readers only sees messages 
starting from the intended message
@@ -354,10 +354,10 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
     }
 
     @Test
-    public void testSimpleReaderReachEndofTopic() throws Exception {
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+    public void testSimpleReaderReachEndOfTopic() throws Exception {
+        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReaderReachEndOfTopic")
                 .startMessageId(MessageId.earliest).create();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReaderReachEndOfTopic")
                 .create();
 
         // no data write, should return false
@@ -409,13 +409,13 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
     }
 
     @Test
-    public void testReaderReachEndofTopicOnMessageWithBatches() throws 
Exception {
+    public void testReaderReachEndOfTopicOnMessageWithBatches() throws 
Exception {
         Reader<byte[]> reader = pulsarClient.newReader()
-                
.topic("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches")
+                
.topic("persistent://my-property/use/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
                 .startMessageId(MessageId.earliest).create();
 
         Producer<byte[]> producer = pulsarClient.newProducer()
-                
.topic("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches")
+                
.topic("persistent://my-property/use/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
                 .enableBatching(true).batchingMaxPublishDelay(100, 
TimeUnit.MILLISECONDS).create();
 
         // no data write, should return false
@@ -448,4 +448,40 @@ public class TopicReaderTest extends ProducerConsumerBase {
         assertFalse(reader.hasMessageAvailable());
         producer.close();
     }
+
+    @Test
+    public void testMessageAvailableAfterRestart() throws Exception {
+        String topic = 
"persistent://my-property/use/my-ns/testMessageAvailableAfterRestart";
+        String content = "my-message-1";
+
+        // stop retention from cleaning up
+        
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
+
+        try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
+            .startMessageId(MessageId.earliest).create()) {
+            assertFalse(reader.hasMessageAvailable());
+        }
+
+        try (Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create()) {
+            producer.send(content.getBytes());
+        }
+
+        try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
+            .startMessageId(MessageId.earliest).create()) {
+            assertTrue(reader.hasMessageAvailable());
+        }
+
+        // cause broker to drop topic. Will be loaded next time we access it
+        pulsar.getBrokerService().getTopicReference(topic).get().close().get();
+
+        try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
+            .startMessageId(MessageId.earliest).create()) {
+            assertTrue(reader.hasMessageAvailable());
+
+            String readOut = new String(reader.readNext().getData());
+            assertTrue(readOut.equals(content));
+            assertFalse(reader.hasMessageAvailable());
+        }
+
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to