sijie closed pull request #1550: Issue #1517: make getLastConfirmedEntry in ManagedLedgerImpl return real LAC URL: https://github.com/apache/incubator-pulsar/pull/1550
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index bf2cd8c587..85b5879e95 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -357,7 +357,19 @@ public void operationFailed(MetaStoreException e) { STATE_UPDATER.set(this, State.LedgerOpened); lastLedgerCreatedTimestamp = System.currentTimeMillis(); currentLedger = lh; + lastConfirmedEntry = new PositionImpl(lh.getId(), -1); + // bypass empty ledgers, find last ledger with Message if possible. + while (lastConfirmedEntry.getEntryId() == -1) { + Map.Entry<Long, LedgerInfo> formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId()); + if (formerLedger != null) { + LedgerInfo ledgerInfo = formerLedger.getValue(); + lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1); + } else { + break; + } + } + LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build(); ledgers.put(lh.getId(), info); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 12415dbb1d..9891d0bbfc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -60,10 +60,10 @@ protected void cleanup() throws Exception { @Test public void testSimpleReader() throws Exception { - Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") + Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReader") .startMessageId(MessageId.earliest).create(); - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReader") .create(); for (int i = 0; i < 10; i++) { String message = "my-message-" + i; @@ -88,14 +88,14 @@ public void testSimpleReader() throws Exception { @Test public void testReaderAfterMessagesWerePublished() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished") .create(); for (int i = 0; i < 10; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); } - Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") + Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderAfterMessagesWerePublished") .startMessageId(MessageId.earliest).create(); Message<byte[]> msg = null; @@ -116,17 +116,17 @@ public void testReaderAfterMessagesWerePublished() throws Exception { @Test public void testMultipleReaders() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testMultipleReaders") .create(); for (int i = 0; i < 10; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); } - Reader<byte[]> reader1 = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") + Reader<byte[]> reader1 = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMultipleReaders") .startMessageId(MessageId.earliest).create(); - Reader<byte[]> reader2 = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") + Reader<byte[]> reader2 = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMultipleReaders") .startMessageId(MessageId.earliest).create(); Message<byte[]> msg = null; @@ -157,7 +157,7 @@ public void testMultipleReaders() throws Exception { @Test public void testTopicStats() throws Exception { - String topicName = "persistent://my-property/use/my-ns/my-topic1"; + String topicName = "persistent://my-property/use/my-ns/testTopicStats"; Reader<byte[]> reader1 = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create(); @@ -178,14 +178,14 @@ public void testTopicStats() throws Exception { @Test public void testReaderOnLastMessage() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderOnLastMessage") .create(); for (int i = 0; i < 10; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); } - Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") + Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderOnLastMessage") .startMessageId(MessageId.latest).create(); for (int i = 10; i < 20; i++) { @@ -213,7 +213,7 @@ public void testReaderOnLastMessage() throws Exception { @Test public void testReaderOnSpecificMessage() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessage") .create(); List<MessageId> messageIds = new ArrayList<>(); for (int i = 0; i < 10; i++) { @@ -221,7 +221,7 @@ public void testReaderOnSpecificMessage() throws Exception { messageIds.add(producer.send(message.getBytes())); } - Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") + Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testReaderOnSpecificMessage") .startMessageId(messageIds.get(4)).create(); // Publish more messages and verify the readers only sees messages starting from the intended message @@ -354,10 +354,10 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe } @Test - public void testSimpleReaderReachEndofTopic() throws Exception { - Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/my-topic1") + public void testSimpleReaderReachEndOfTopic() throws Exception { + Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testSimpleReaderReachEndOfTopic") .startMessageId(MessageId.earliest).create(); - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1") + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testSimpleReaderReachEndOfTopic") .create(); // no data write, should return false @@ -409,13 +409,13 @@ public void testSimpleReaderReachEndofTopic() throws Exception { } @Test - public void testReaderReachEndofTopicOnMessageWithBatches() throws Exception { + public void testReaderReachEndOfTopicOnMessageWithBatches() throws Exception { Reader<byte[]> reader = pulsarClient.newReader() - .topic("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches") + .topic("persistent://my-property/use/my-ns/testReaderReachEndOfTopicOnMessageWithBatches") .startMessageId(MessageId.earliest).create(); Producer<byte[]> producer = pulsarClient.newProducer() - .topic("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches") + .topic("persistent://my-property/use/my-ns/testReaderReachEndOfTopicOnMessageWithBatches") .enableBatching(true).batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS).create(); // no data write, should return false @@ -448,4 +448,40 @@ public void testReaderReachEndofTopicOnMessageWithBatches() throws Exception { assertFalse(reader.hasMessageAvailable()); producer.close(); } + + @Test + public void testMessageAvailableAfterRestart() throws Exception { + String topic = "persistent://my-property/use/my-ns/testMessageAvailableAfterRestart"; + String content = "my-message-1"; + + // stop retention from cleaning up + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close(); + + try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic) + .startMessageId(MessageId.earliest).create()) { + assertFalse(reader.hasMessageAvailable()); + } + + try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create()) { + producer.send(content.getBytes()); + } + + try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic) + .startMessageId(MessageId.earliest).create()) { + assertTrue(reader.hasMessageAvailable()); + } + + // cause broker to drop topic. Will be loaded next time we access it + pulsar.getBrokerService().getTopicReference(topic).get().close().get(); + + try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic) + .startMessageId(MessageId.earliest).create()) { + assertTrue(reader.hasMessageAvailable()); + + String readOut = new String(reader.readNext().getData()); + assertTrue(readOut.equals(content)); + assertFalse(reader.hasMessageAvailable()); + } + + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services