This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d809104e9dc940a85f2d3ecf1b372bf2833ac0cc Author: lipenghui <[email protected]> AuthorDate: Fri Jun 18 16:47:08 2021 +0800 Fix NonRecoverableLedgerException when get last message ID by Reader (#10957) * Fix NonRecoverableLedgerException when get last message ID by Reader If a topic only have non-durable subscriptions or mark delete position of all the durable subscriptions are reached the LAC, all the ledgers except the current ledger will be deleted. Since the current ledger may not have any data, occurs NonRecoverableLedgerException in the broker side. In this case, the we should return the message ID (-1, -1). (cherry picked from commit 80f42e20099abac4002e8f84bbdcaba7aa57a8bd) --- .../broker/admin/impl/PersistentTopicsBase.java | 1 + .../apache/pulsar/broker/service/ServerCnx.java | 15 +++++-- .../SystemTopicBasedTopicPoliciesService.java | 12 ++++++ .../broker/service/persistent/PersistentTopic.java | 46 ++++++++++++++-------- .../broker/admin/AdminApiGetLastMessageIdTest.java | 45 ++++++++++++++++++++- 5 files changed, 99 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 97f4a8d..4a03cd1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1127,6 +1127,7 @@ public class PersistentTopicsBase extends AdminResource { } return topic.getInternalStats(metadata).get(); } catch (Exception e) { + log.error("[{}] Failed to get internal stats for {}", clientAppId(), topicName, e); throw new RestException(Status.INTERNAL_SERVER_ERROR, (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c446833..8006b89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1694,9 +1694,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { batchSizeFuture.whenComplete((batchSize, e) -> { if (e != null) { - ctx.writeAndFlush(Commands.newError( - requestId, ServerError.MetadataError, - "Failed to get batch size for entry " + e.getMessage())); + if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) { + // in this case, the ledgers been removed except the current ledger + // and current ledger without any data + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, + -1, -1, partitionIndex, -1, + markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, + markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + } else { + ctx.writeAndFlush(Commands.newError( + requestId, ServerError.MetadataError, + "Failed to get batch size for entry " + e.getMessage())); + } } else { int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 8cbbde4..807da68 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -184,6 +184,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic readerCaches.put(namespace, readerCompletableFuture); readerCompletableFuture.whenComplete((reader, ex) -> { if (ex != null) { + log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); result.completeExceptionally(ex); } else { initPolicesCache(reader, result); @@ -239,19 +240,30 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> future) { reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> { if (ex != null) { + log.error("[{}] Failed to check the move events for the system topic", + reader.getSystemTopic().getTopicName(), ex); future.completeExceptionally(ex); readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); } if (hasMore) { reader.readNextAsync().whenComplete((msg, e) -> { if (e != null) { + log.error("[{}] Failed to read event from the system topic.", + reader.getSystemTopic().getTopicName(), ex); future.completeExceptionally(e); readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject()); } refreshTopicPoliciesCache(msg); + if (log.isDebugEnabled()) { + log.debug("[{}] Loop next event reading for system topic.", + reader.getSystemTopic().getTopicName().getNamespaceObject()); + } initPolicesCache(reader, future); }); } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName()); + } future.complete(null); policyCacheInitMap.computeIfPresent( reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index aa73813..38b3264 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiFunction; import java.util.stream.Collectors; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -1940,22 +1941,35 @@ public class PersistentTopic extends AbstractTopic ledgers.forEach(ledgerId -> { CompletableFuture<Void> completableFuture = new CompletableFuture<>(); getLedgerMetadataFutures.add(completableFuture); - brokerService.getPulsar().getBookKeeperClient() - .getLedgerMetadata(ledgerId) - .thenAccept(metadata -> { - LedgerInfo schemaLedgerInfo = new LedgerInfo(); - schemaLedgerInfo.ledgerId = metadata.getLedgerId(); - schemaLedgerInfo.entries = metadata.getLastEntryId() + 1; - schemaLedgerInfo.size = metadata.getLength(); - if (includeLedgerMetadata) { - info.metadata = metadata.toSafeString(); - } - stats.schemaLedgers.add(schemaLedgerInfo); - completableFuture.complete(null); - }).exceptionally(e -> { - completableFuture.completeExceptionally(e); - return null; - }); + CompletableFuture<LedgerMetadata> metadataFuture = null; + try { + metadataFuture = brokerService.getPulsar().getBookKeeperClient() + .getLedgerMetadata(ledgerId); + } catch (NullPointerException e) { + // related to bookkeeper issue https://github.com/apache/bookkeeper/issues/2741 + if (log.isDebugEnabled()) { + log.debug("{{}} Failed to get ledger metadata for the schema ledger {}", + topic, ledgerId, e); + } + } + if (metadataFuture != null) { + metadataFuture.thenAccept(metadata -> { + LedgerInfo schemaLedgerInfo = new LedgerInfo(); + schemaLedgerInfo.ledgerId = metadata.getLedgerId(); + schemaLedgerInfo.entries = metadata.getLastEntryId() + 1; + schemaLedgerInfo.size = metadata.getLength(); + if (includeLedgerMetadata) { + info.metadata = metadata.toSafeString(); + } + stats.schemaLedgers.add(schemaLedgerInfo); + completableFuture.complete(null); + }).exceptionally(e -> { + completableFuture.completeExceptionally(e); + return null; + }); + } else { + completableFuture.complete(null); + } }); FutureUtil.waitForAll(getLedgerMetadataFutures).thenRun(() -> { schemaStoreLedgersFuture.complete(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java index b672dfd..533a2a5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java @@ -22,14 +22,19 @@ import com.google.common.collect.Sets; import org.apache.pulsar.broker.admin.v2.PersistentTopics; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -43,6 +48,8 @@ import java.lang.reflect.Field; import java.util.Collection; import java.util.Date; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.doNothing; @@ -218,4 +225,40 @@ public class AdminApiGetLastMessageIdTest extends MockedPulsarServiceBaseTest { Assert.assertTrue(((MessageIdImpl)id[0]).getLedgerId() > 0); Assert.assertEquals( 2 * numberOfMessages -1, ((MessageIdImpl)id[0]).getEntryId()); } + + /** + * If a topic only have non-durable subscriptions or mark delete position of all the durable subscriptions + * are reached the LAC, all the ledgers except the current ledger will be deleted. Since the current ledger may not + * have any data, so the test is to ensure the get last message ID API can work in this case. + * + * In this case, the we should return the message ID (-1, -1). + */ + @Test + public void testGetLastMessageIdWhenTopicWithoutData() throws Exception { + final String topic = "persistent://prop/ns-abc/testGetLastMessageIdWhenTopicWithoutData-" + UUID.randomUUID(); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + final int messages = 10; + for (int i = 0; i < messages; i++) { + producer.send("Message - " + i); + } + // To trigger the ledger rollover + admin.topics().unload(topic); + Topic topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + ((PersistentTopic) topicRef).getManagedLedger().trimConsumedLedgersInBackground(new CompletableFuture<>()); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); + Assert.assertEquals(stats.ledgers.size(), 1); + }); + Reader<String> reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + // This will call the get last message ID api. + boolean hasMessage = reader.hasMessageAvailable(); + Assert.assertFalse(hasMessage); + MessageId messageId = admin.topics().getLastMessageId(topic); + Assert.assertEquals(messageId.toString(), "-1:-1:-1"); + } }
