This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e80f88cd46 [fix] [broker] fix NPE when calculating a topic's 
backlogQuota (#23720)
8e80f88cd46 is described below

commit 8e80f88cd46ad041b87773d49c5ce4420df95b9a
Author: fengyubiao <[email protected]>
AuthorDate: Fri Dec 13 11:36:09 2024 +0800

    [fix] [broker] fix NPE when calculating a topic's backlogQuota (#23720)
---
 .../broker/service/persistent/PersistentTopic.java |   6 +-
 .../PersistentTopicProtectedMethodsTest.java       | 114 +++++++++++++++++++++
 2 files changed, 118 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0b2c9d8c7bc..056fad2a005 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3631,7 +3631,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         });
     }
 
-    private EstimateTimeBasedBacklogQuotaCheckResult 
estimatedTimeBasedBacklogQuotaCheck(
+    @VisibleForTesting
+    EstimateTimeBasedBacklogQuotaCheckResult 
estimatedTimeBasedBacklogQuotaCheck(
             Position markDeletePosition)
             throws ExecutionException, InterruptedException {
         int backlogQuotaLimitInSecond = 
getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
@@ -3650,7 +3651,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
         // if the mark-delete position is the last entry it means all entries 
for
         // that ledger are acknowledged
-        if (markDeletePosition.getEntryId() == 
markDeletePositionLedgerInfo.getEntries() - 1) {
+        if (markDeletePositionLedgerInfo != null
+                && (markDeletePosition.getEntryId() == 
markDeletePositionLedgerInfo.getEntries() - 1)) {
             Position positionToCheck = 
ledger.getNextValidPosition(markDeletePosition);
             positionToCheckLedgerInfo = 
ledger.getLedgerInfo(positionToCheck.getLedgerId()).get();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
new file mode 100644
index 00000000000..1d841483ed7
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.testng.Assert.assertEquals;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class PersistentTopicProtectedMethodsTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    protected void doInitConf() throws Exception {
+        this.conf.setPreciseTimeBasedBacklogQuotaCheck(true);
+        this.conf.setManagedLedgerMaxEntriesPerLedger(2);
+        this.conf.setManagedLedgerMaxLedgerRolloverTimeMinutes(10);
+        this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+    }
+
+    /***
+     * Background: the steps for checking backlog metadata are as follows.
+     * - Get the oldest cursor.
+     * - Return the result if the oldest `cursor.md` equals LAC.
+     * - Else, calculate the estimated backlog quota.
+     *
+     * What case been covered by this test.
+     * - The method `PersistentTopic.estimatedTimeBasedBacklogQuotaCheck` may 
get an NPE when the
+     *   `@param position(cursor.markDeletedPositon)` equals LAC and the 
latest ledger has been removed by a
+     *   `ML.trimLedgers`, which was introduced by 
https://github.com/apache/pulsar/pull/21816.
+     * - Q: The broker checked whether the oldest `cursor.md` equals LAC at 
step 2 above, why does it still call
+     *      `PersistentTopic.estimatedTimeBasedBacklogQuotaCheck` with a param 
that equals `LAC`?
+     *   - A: There may be some `acknowledgments` and `ML.trimLedgers` that 
happened between `step2 above and step 3`.
+     */
+    @Test
+    public void testEstimatedTimeBasedBacklogQuotaCheckWhenNoBacklog() throws 
Exception {
+        final String tp = BrokerTestUtil.newUniqueName("public/default/tp");
+        admin.topics().createNonPartitionedTopic(tp);
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopic(tp, false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        Consumer c1 = 
pulsarClient.newConsumer().topic(tp).subscriptionName("s1").subscribe();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().get("s1");
+
+        // Generated multi ledgers.
+        Producer<byte[]> p1 = pulsarClient.newProducer().topic(tp).create();
+        byte[] content = new byte[]{1};
+        for (int i = 0; i < 10; i++) {
+            p1.send(content);
+        }
+
+        // Consume all messages.
+        // Trim ledgers, then the LAC relates to a ledger who has been deleted.
+        admin.topics().skipAllMessages(tp, "s1");
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0);
+            assertEquals(cursor.getMarkDeletedPosition(), 
ml.getLastConfirmedEntry());
+        });
+        CompletableFuture completableFuture = new CompletableFuture();
+        ml.trimConsumedLedgersInBackground(completableFuture);
+        completableFuture.join();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ml.getLedgersInfo().size(), 1);
+            assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0);
+            assertEquals(cursor.getMarkDeletedPosition(), 
ml.getLastConfirmedEntry());
+        });
+
+        // Verify: "persistentTopic.estimatedTimeBasedBacklogQuotaCheck" will 
not get a NullPointerException.
+        Position oldestPosition = 
ml.getCursors().getCursorWithOldestPosition().getPosition();
+        persistentTopic.estimatedTimeBasedBacklogQuotaCheck(oldestPosition);
+
+        p1.close();
+        c1.close();
+        admin.topics().delete(tp, false);
+    }
+}

Reply via email to