This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8e80f88cd46 [fix] [broker] fix NPE when calculating a topic's
backlogQuota (#23720)
8e80f88cd46 is described below
commit 8e80f88cd46ad041b87773d49c5ce4420df95b9a
Author: fengyubiao <[email protected]>
AuthorDate: Fri Dec 13 11:36:09 2024 +0800
[fix] [broker] fix NPE when calculating a topic's backlogQuota (#23720)
---
.../broker/service/persistent/PersistentTopic.java | 6 +-
.../PersistentTopicProtectedMethodsTest.java | 114 +++++++++++++++++++++
2 files changed, 118 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0b2c9d8c7bc..056fad2a005 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3631,7 +3631,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
});
}
- private EstimateTimeBasedBacklogQuotaCheckResult
estimatedTimeBasedBacklogQuotaCheck(
+ @VisibleForTesting
+ EstimateTimeBasedBacklogQuotaCheckResult
estimatedTimeBasedBacklogQuotaCheck(
Position markDeletePosition)
throws ExecutionException, InterruptedException {
int backlogQuotaLimitInSecond =
getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
@@ -3650,7 +3651,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
// if the mark-delete position is the last entry it means all entries
for
// that ledger are acknowledged
- if (markDeletePosition.getEntryId() ==
markDeletePositionLedgerInfo.getEntries() - 1) {
+ if (markDeletePositionLedgerInfo != null
+ && (markDeletePosition.getEntryId() ==
markDeletePositionLedgerInfo.getEntries() - 1)) {
Position positionToCheck =
ledger.getNextValidPosition(markDeletePosition);
positionToCheckLedgerInfo =
ledger.getLedgerInfo(positionToCheck.getLedgerId()).get();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
new file mode 100644
index 00000000000..1d841483ed7
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.testng.Assert.assertEquals;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class PersistentTopicProtectedMethodsTest extends ProducerConsumerBase {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ protected void doInitConf() throws Exception {
+ this.conf.setPreciseTimeBasedBacklogQuotaCheck(true);
+ this.conf.setManagedLedgerMaxEntriesPerLedger(2);
+ this.conf.setManagedLedgerMaxLedgerRolloverTimeMinutes(10);
+ this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+ }
+
+ /***
+ * Background: the steps for checking backlog metadata are as follows.
+ * - Get the oldest cursor.
+ * - Return the result if the oldest `cursor.md` equals LAC.
+ * - Else, calculate the estimated backlog quota.
+ *
+ * What case been covered by this test.
+ * - The method `PersistentTopic.estimatedTimeBasedBacklogQuotaCheck` may
get an NPE when the
+ * `@param position(cursor.markDeletedPositon)` equals LAC and the
latest ledger has been removed by a
+ * `ML.trimLedgers`, which was introduced by
https://github.com/apache/pulsar/pull/21816.
+ * - Q: The broker checked whether the oldest `cursor.md` equals LAC at
step 2 above, why does it still call
+ * `PersistentTopic.estimatedTimeBasedBacklogQuotaCheck` with a param
that equals `LAC`?
+ * - A: There may be some `acknowledgments` and `ML.trimLedgers` that
happened between `step2 above and step 3`.
+ */
+ @Test
+ public void testEstimatedTimeBasedBacklogQuotaCheckWhenNoBacklog() throws
Exception {
+ final String tp = BrokerTestUtil.newUniqueName("public/default/tp");
+ admin.topics().createNonPartitionedTopic(tp);
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopic(tp, false).join().get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ Consumer c1 =
pulsarClient.newConsumer().topic(tp).subscriptionName("s1").subscribe();
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.getCursors().get("s1");
+
+ // Generated multi ledgers.
+ Producer<byte[]> p1 = pulsarClient.newProducer().topic(tp).create();
+ byte[] content = new byte[]{1};
+ for (int i = 0; i < 10; i++) {
+ p1.send(content);
+ }
+
+ // Consume all messages.
+ // Trim ledgers, then the LAC relates to a ledger who has been deleted.
+ admin.topics().skipAllMessages(tp, "s1");
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0);
+ assertEquals(cursor.getMarkDeletedPosition(),
ml.getLastConfirmedEntry());
+ });
+ CompletableFuture completableFuture = new CompletableFuture();
+ ml.trimConsumedLedgersInBackground(completableFuture);
+ completableFuture.join();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml.getLedgersInfo().size(), 1);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0);
+ assertEquals(cursor.getMarkDeletedPosition(),
ml.getLastConfirmedEntry());
+ });
+
+ // Verify: "persistentTopic.estimatedTimeBasedBacklogQuotaCheck" will
not get a NullPointerException.
+ Position oldestPosition =
ml.getCursors().getCursorWithOldestPosition().getPosition();
+ persistentTopic.estimatedTimeBasedBacklogQuotaCheck(oldestPosition);
+
+ p1.close();
+ c1.close();
+ admin.topics().delete(tp, false);
+ }
+}