This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d809104e9dc940a85f2d3ecf1b372bf2833ac0cc
Author: lipenghui <[email protected]>
AuthorDate: Fri Jun 18 16:47:08 2021 +0800

    Fix NonRecoverableLedgerException when get last message ID by Reader 
(#10957)
    
    * Fix NonRecoverableLedgerException when get last message ID by Reader
    
    If a topic only have non-durable subscriptions or mark delete position of 
all the durable subscriptions
    are reached the LAC, all the ledgers except the current ledger will be 
deleted.
    Since the current ledger may not have any data, occurs 
NonRecoverableLedgerException in the broker side.
    
    In this case, the we should return the message ID (-1, -1).
    
    (cherry picked from commit 80f42e20099abac4002e8f84bbdcaba7aa57a8bd)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  1 +
 .../apache/pulsar/broker/service/ServerCnx.java    | 15 +++++--
 .../SystemTopicBasedTopicPoliciesService.java      | 12 ++++++
 .../broker/service/persistent/PersistentTopic.java | 46 ++++++++++++++--------
 .../broker/admin/AdminApiGetLastMessageIdTest.java | 45 ++++++++++++++++++++-
 5 files changed, 99 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 97f4a8d..4a03cd1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1127,6 +1127,7 @@ public class PersistentTopicsBase extends AdminResource {
             }
             return topic.getInternalStats(metadata).get();
         } catch (Exception e) {
+            log.error("[{}] Failed to get internal stats for {}", 
clientAppId(), topicName, e);
             throw new RestException(Status.INTERNAL_SERVER_ERROR,
                     (e instanceof ExecutionException) ? 
e.getCause().getMessage() : e.getMessage());
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c446833..8006b89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1694,9 +1694,18 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         batchSizeFuture.whenComplete((batchSize, e) -> {
             if (e != null) {
-                ctx.writeAndFlush(Commands.newError(
-                        requestId, ServerError.MetadataError,
-                        "Failed to get batch size for entry " + 
e.getMessage()));
+                if (e.getCause() instanceof 
ManagedLedgerException.NonRecoverableLedgerException) {
+                    // in this case, the ledgers been removed except the 
current ledger
+                    // and current ledger without any data
+                    
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+                            -1, -1, partitionIndex, -1,
+                            markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
+                            markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+                } else {
+                    ctx.writeAndFlush(Commands.newError(
+                            requestId, ServerError.MetadataError,
+                            "Failed to get batch size for entry " + 
e.getMessage()));
+                }
             } else {
                 int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 8cbbde4..807da68 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -184,6 +184,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 readerCaches.put(namespace, readerCompletableFuture);
                 readerCompletableFuture.whenComplete((reader, ex) -> {
                     if (ex != null) {
+                        log.error("[{}] Failed to create reader on 
__change_events topic", namespace, ex);
                         result.completeExceptionally(ex);
                     } else {
                         initPolicesCache(reader, result);
@@ -239,19 +240,30 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> 
reader, CompletableFuture<Void> future) {
         reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
             if (ex != null) {
+                log.error("[{}] Failed to check the move events for the system 
topic",
+                        reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
                 
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
             }
             if (hasMore) {
                 reader.readNextAsync().whenComplete((msg, e) -> {
                     if (e != null) {
+                        log.error("[{}] Failed to read event from the system 
topic.",
+                                reader.getSystemTopic().getTopicName(), ex);
                         future.completeExceptionally(e);
                         
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
                     }
                     refreshTopicPoliciesCache(msg);
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Loop next event reading for system 
topic.",
+                                
reader.getSystemTopic().getTopicName().getNamespaceObject());
+                    }
                     initPolicesCache(reader, future);
                 });
             } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Reach the end of the system topic.", 
reader.getSystemTopic().getTopicName());
+                }
                 future.complete(null);
                 policyCacheInitMap.computeIfPresent(
                         
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index aa73813..38b3264 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -1940,22 +1941,35 @@ public class PersistentTopic extends AbstractTopic
                         ledgers.forEach(ledgerId -> {
                             CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
                             getLedgerMetadataFutures.add(completableFuture);
-                            brokerService.getPulsar().getBookKeeperClient()
-                                    .getLedgerMetadata(ledgerId)
-                                    .thenAccept(metadata -> {
-                                        LedgerInfo schemaLedgerInfo = new 
LedgerInfo();
-                                        schemaLedgerInfo.ledgerId = 
metadata.getLedgerId();
-                                        schemaLedgerInfo.entries = 
metadata.getLastEntryId() + 1;
-                                        schemaLedgerInfo.size = 
metadata.getLength();
-                                        if (includeLedgerMetadata) {
-                                            info.metadata = 
metadata.toSafeString();
-                                        }
-                                        
stats.schemaLedgers.add(schemaLedgerInfo);
-                                        completableFuture.complete(null);
-                                    }).exceptionally(e -> {
-                                completableFuture.completeExceptionally(e);
-                                return null;
-                            });
+                            CompletableFuture<LedgerMetadata> metadataFuture = 
null;
+                            try {
+                                metadataFuture = 
brokerService.getPulsar().getBookKeeperClient()
+                                    .getLedgerMetadata(ledgerId);
+                            } catch (NullPointerException e) {
+                                // related to bookkeeper issue 
https://github.com/apache/bookkeeper/issues/2741
+                                if (log.isDebugEnabled()) {
+                                    log.debug("{{}} Failed to get ledger 
metadata for the schema ledger {}",
+                                            topic, ledgerId, e);
+                                }
+                            }
+                            if (metadataFuture != null) {
+                                metadataFuture.thenAccept(metadata -> {
+                                    LedgerInfo schemaLedgerInfo = new 
LedgerInfo();
+                                    schemaLedgerInfo.ledgerId = 
metadata.getLedgerId();
+                                    schemaLedgerInfo.entries = 
metadata.getLastEntryId() + 1;
+                                    schemaLedgerInfo.size = 
metadata.getLength();
+                                    if (includeLedgerMetadata) {
+                                        info.metadata = 
metadata.toSafeString();
+                                    }
+                                    stats.schemaLedgers.add(schemaLedgerInfo);
+                                    completableFuture.complete(null);
+                                }).exceptionally(e -> {
+                                    completableFuture.completeExceptionally(e);
+                                    return null;
+                                });
+                            } else {
+                                completableFuture.complete(null);
+                            }
                         });
                         
FutureUtil.waitForAll(getLedgerMetadataFutures).thenRun(() -> {
                             schemaStoreLedgersFuture.complete(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
index b672dfd..533a2a5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
@@ -22,14 +22,19 @@ import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -43,6 +48,8 @@ import java.lang.reflect.Field;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.doNothing;
@@ -218,4 +225,40 @@ public class AdminApiGetLastMessageIdTest extends 
MockedPulsarServiceBaseTest {
         Assert.assertTrue(((MessageIdImpl)id[0]).getLedgerId() > 0);
         Assert.assertEquals( 2 * numberOfMessages -1, 
((MessageIdImpl)id[0]).getEntryId());
     }
+
+    /**
+     * If a topic only have non-durable subscriptions or mark delete position 
of all the durable subscriptions
+     * are reached the LAC, all the ledgers except the current ledger will be 
deleted. Since the current ledger may not
+     * have any data, so the test is to ensure the get last message ID API can 
work in this case.
+     *
+     * In this case, the we should return the message ID (-1, -1).
+     */
+    @Test
+    public void testGetLastMessageIdWhenTopicWithoutData() throws Exception {
+        final String topic = 
"persistent://prop/ns-abc/testGetLastMessageIdWhenTopicWithoutData-" + 
UUID.randomUUID();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.send("Message - " + i);
+        }
+        // To trigger the ledger rollover
+        admin.topics().unload(topic);
+        Topic topicRef = 
pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+        ((PersistentTopic) 
topicRef).getManagedLedger().trimConsumedLedgersInBackground(new 
CompletableFuture<>());
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+            Assert.assertEquals(stats.ledgers.size(), 1);
+        });
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+        // This will call the get last message ID api.
+        boolean hasMessage = reader.hasMessageAvailable();
+        Assert.assertFalse(hasMessage);
+        MessageId messageId = admin.topics().getLastMessageId(topic);
+        Assert.assertEquals(messageId.toString(), "-1:-1:-1");
+    }
 }

Reply via email to