sijie closed pull request #1550: Issue #1517: make getLastConfirmedEntry in 
ManagedLedgerImpl return real LAC
URL: https://github.com/apache/incubator-pulsar/pull/1550
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bf2cd8c587..85b5879e95 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -357,7 +357,19 @@ public void operationFailed(MetaStoreException e) {
                         STATE_UPDATER.set(this, State.LedgerOpened);
                         lastLedgerCreatedTimestamp = 
System.currentTimeMillis();
                         currentLedger = lh;
+
                         lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
+                        // bypass empty ledgers, find last ledger with Message 
if possible.
+                        while (lastConfirmedEntry.getEntryId() == -1) {
+                            Map.Entry<Long, LedgerInfo> formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+                            if (formerLedger != null) {
+                                LedgerInfo ledgerInfo = 
formerLedger.getValue();
+                                lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+                            } else {
+                                break;
+                            }
+                        }
+
                         LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
                         ledgers.put(lh.getId(), info);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 12415dbb1d..9891d0bbfc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -60,10 +60,10 @@ protected void cleanup() throws Exception {
 
     @Test
     public void testSimpleReader() throws Exception {
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReader")
                 .startMessageId(MessageId.earliest).create();
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReader")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
@@ -88,14 +88,14 @@ public void testSimpleReader() throws Exception {
 
     @Test
     public void testReaderAfterMessagesWerePublished() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished")
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
@@ -116,17 +116,17 @@ public void testReaderAfterMessagesWerePublished() throws 
Exception {
 
     @Test
     public void testMultipleReaders() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testMultipleReaders")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Reader<byte[]> reader1 = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader1 = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMultipleReaders")
                 .startMessageId(MessageId.earliest).create();
 
-        Reader<byte[]> reader2 = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader2 = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMultipleReaders")
                 .startMessageId(MessageId.earliest).create();
 
         Message<byte[]> msg = null;
@@ -157,7 +157,7 @@ public void testMultipleReaders() throws Exception {
 
     @Test
     public void testTopicStats() throws Exception {
-        String topicName = "persistent://my-property/use/my-ns/my-topic1";
+        String topicName = "persistent://my-property/use/my-ns/testTopicStats";
 
         Reader<byte[]> reader1 = 
pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
 
@@ -178,14 +178,14 @@ public void testTopicStats() throws Exception {
 
     @Test
     public void testReaderOnLastMessage() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderOnLastMessage")
                 .create();
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderOnLastMessage")
                 .startMessageId(MessageId.latest).create();
 
         for (int i = 10; i < 20; i++) {
@@ -213,7 +213,7 @@ public void testReaderOnLastMessage() throws Exception {
 
     @Test
     public void testReaderOnSpecificMessage() throws Exception {
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessage")
                 .create();
         List<MessageId> messageIds = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
@@ -221,7 +221,7 @@ public void testReaderOnSpecificMessage() throws Exception {
             messageIds.add(producer.send(message.getBytes()));
         }
 
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessage")
                 .startMessageId(messageIds.get(4)).create();
 
         // Publish more messages and verify the readers only sees messages 
starting from the intended message
@@ -354,10 +354,10 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> keyMe
     }
 
     @Test
-    public void testSimpleReaderReachEndofTopic() throws Exception {
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1")
+    public void testSimpleReaderReachEndOfTopic() throws Exception {
+        Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReaderReachEndOfTopic")
                 .startMessageId(MessageId.earliest).create();
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReaderReachEndOfTopic")
                 .create();
 
         // no data write, should return false
@@ -409,13 +409,13 @@ public void testSimpleReaderReachEndofTopic() throws 
Exception {
     }
 
     @Test
-    public void testReaderReachEndofTopicOnMessageWithBatches() throws 
Exception {
+    public void testReaderReachEndOfTopicOnMessageWithBatches() throws 
Exception {
         Reader<byte[]> reader = pulsarClient.newReader()
-                
.topic("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches")
+                
.topic("persistent://my-property/use/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
                 .startMessageId(MessageId.earliest).create();
 
         Producer<byte[]> producer = pulsarClient.newProducer()
-                
.topic("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches")
+                
.topic("persistent://my-property/use/my-ns/testReaderReachEndOfTopicOnMessageWithBatches")
                 .enableBatching(true).batchingMaxPublishDelay(100, 
TimeUnit.MILLISECONDS).create();
 
         // no data write, should return false
@@ -448,4 +448,40 @@ public void 
testReaderReachEndofTopicOnMessageWithBatches() throws Exception {
         assertFalse(reader.hasMessageAvailable());
         producer.close();
     }
+
+    @Test
+    public void testMessageAvailableAfterRestart() throws Exception {
+        String topic = 
"persistent://my-property/use/my-ns/testMessageAvailableAfterRestart";
+        String content = "my-message-1";
+
+        // stop retention from cleaning up
+        
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
+
+        try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
+            .startMessageId(MessageId.earliest).create()) {
+            assertFalse(reader.hasMessageAvailable());
+        }
+
+        try (Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create()) {
+            producer.send(content.getBytes());
+        }
+
+        try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
+            .startMessageId(MessageId.earliest).create()) {
+            assertTrue(reader.hasMessageAvailable());
+        }
+
+        // cause broker to drop topic. Will be loaded next time we access it
+        pulsar.getBrokerService().getTopicReference(topic).get().close().get();
+
+        try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
+            .startMessageId(MessageId.earliest).create()) {
+            assertTrue(reader.hasMessageAvailable());
+
+            String readOut = new String(reader.readNext().getData());
+            assertTrue(readOut.equals(content));
+            assertFalse(reader.hasMessageAvailable());
+        }
+
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to