This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 90c4653aea8 [fix][broker] Fix creating producer failure when set
backlog quota. (#15663)
90c4653aea8 is described below
commit 90c4653aea848953696599c72553952b3ef0b53f
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed May 25 05:54:55 2022 -0700
[fix][broker] Fix creating producer failure when set backlog quota. (#15663)
When trying to reproduce the problem of #15609 using the master's code, it
was found that the master also had this bug. The root cause is:
When there is only one ledger in the ManagedLedger, after the current
ledger is closed, it has the timestamp and exceeds the time set by the
backlog-qutoa, resulting in the failure to create the producer.
The added test could reproduce this.
So when there is only one ledger, we should not exclude it.
If revert this patch, the added test will fail.
(cherry picked from commit 3a8045851f7e9ea62da104dab2b7fe2b47a95ca9)
---
.../broker/service/persistent/PersistentTopic.java | 47 +++++++++++-----
.../systopic/PartitionedSystemTopicTest.java | 64 ++++++++++++++++++++++
2 files changed, 96 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6bcfa36320c..db824b0706b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2626,22 +2626,9 @@ public class PersistentTopic extends AbstractTopic
return false;
}
} else {
- Long ledgerId = ((ManagedCursorContainer)
ledger.getCursors()).getSlowestReaderPosition().getLedgerId();
+ PositionImpl slowestPosition = ((ManagedCursorContainer)
ledger.getCursors()).getSlowestReaderPosition();
try {
-
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
- ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
- if (ledgerInfo != null && ledgerInfo.hasTimestamp() &&
ledgerInfo.getTimestamp() > 0
- && ((ManagedLedgerImpl) ledger).getClock().millis() -
ledgerInfo.getTimestamp()
- > backlogQuotaLimitInSecond * 1000) {
- if (log.isDebugEnabled()) {
- log.debug("Time based backlog quota exceeded, quota
{}, age of ledger "
- + "slowest cursor currently on {}",
backlogQuotaLimitInSecond * 1000,
- ((ManagedLedgerImpl)
ledger).getClock().millis() - ledgerInfo.getTimestamp());
- }
- return true;
- } else {
- return false;
- }
+ return
slowestReaderTimeBasedBacklogQuotaCheck(slowestPosition);
} catch (Exception e) {
log.error("[{}][{}] Error reading entry for precise time based
backlog check", topicName, e);
return false;
@@ -2649,6 +2636,36 @@ public class PersistentTopic extends AbstractTopic
}
}
+ private boolean slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl
slowestPosition)
+ throws ExecutionException, InterruptedException {
+ int backlogQuotaLimitInSecond =
getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
+ Long ledgerId = slowestPosition.getLedgerId();
+ if (((ManagedLedgerImpl)
ledger).getLedgersInfo().lastKey().equals(ledgerId)) {
+ return false;
+ }
+ int result;
+
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
+ ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
+ if (ledgerInfo != null && ledgerInfo.hasTimestamp() &&
ledgerInfo.getTimestamp() > 0
+ && ((ManagedLedgerImpl) ledger).getClock().millis() -
ledgerInfo.getTimestamp()
+ > backlogQuotaLimitInSecond * 1000 && (result =
slowestPosition.compareTo(
+ new PositionImpl(ledgerInfo.getLedgerId(),
ledgerInfo.getEntries() - 1))) <= 0) {
+ if (result < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Time based backlog quota exceeded, quota {},
age of ledger "
+ + "slowest cursor currently on {}",
backlogQuotaLimitInSecond * 1000,
+ ((ManagedLedgerImpl) ledger).getClock().millis() -
ledgerInfo.getTimestamp());
+ }
+ return true;
+ } else {
+ return slowestReaderTimeBasedBacklogQuotaCheck(
+ ((ManagedLedgerImpl)
ledger).getNextValidPosition(slowestPosition));
+ }
+ } else {
+ return false;
+ }
+ }
+
@Override
public boolean isReplicated() {
return !replicators.isEmpty();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index d506f712767..cf45d614f0f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -27,14 +27,18 @@ import org.apache.commons.lang.RandomStringUtils;
import org.apache.pulsar.broker.admin.impl.BrokersBase;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.naming.TopicName;
@@ -43,6 +47,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -50,6 +55,7 @@ import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -65,6 +71,8 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
conf.setAllowAutoTopicCreation(false);
conf.setAllowAutoTopicCreationType("partitioned");
conf.setDefaultNumPartitions(PARTITIONS);
+ conf.setManagedLedgerMaxEntriesPerLedger(1);
+ conf.setBrokerDeleteInactiveTopicsEnabled(false);
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
@@ -170,4 +178,60 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
});
}
+ @Test
+ private void testSetBacklogCausedCreatingProducerFailure() throws
Exception {
+ final String ns = "prop/ns-test";
+ final String topic = ns + "/topic-1";
+
+ admin.namespaces().createNamespace(ns, 2);
+ admin.topics().createPartitionedTopic(String.format("persistent://%s",
topic), 1);
+ BacklogQuota quota = BacklogQuota.builder()
+ .limitTime(2)
+ .limitSize(-1)
+
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+ .build();
+ admin.namespaces().setBacklogQuota(ns, quota,
BacklogQuota.BacklogQuotaType.message_age);
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ String partition0 = TopicName.get(String.format("persistent://%s",
topic)).getPartition(0).toString();
+ Optional<Topic> topicReference =
pulsar.getBrokerService().getTopicReference(partition0);
+ Assert.assertTrue(topicReference.isPresent());
+ PersistentTopic persistentTopic = (PersistentTopic)
topicReference.get();
+ ManagedLedgerConfig config =
persistentTopic.getManagedLedger().getConfig();
+ config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
+ config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
+ persistentTopic.getManagedLedger().setConfig(config);
+ Whitebox.invokeMethod(persistentTopic.getManagedLedger(),
"updateLastLedgerCreatedTimeAndScheduleRolloverTask");
+ String msg1 = "msg-1";
+ producer.send(msg1);
+ Thread.sleep(3 * 1000);
+
+ Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub-1")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ Message<String> receive = consumer2.receive();
+ consumer2.acknowledge(receive);
+
+ Thread.sleep(3 * 1000);
+
+ try {
+ Producer<String> producerN = PulsarClient.builder()
+ .maxBackoffInterval(3, TimeUnit.SECONDS)
+ .operationTimeout(5, TimeUnit.SECONDS)
+ .serviceUrl(lookupUrl.toString()).connectionTimeout(2,
TimeUnit.SECONDS).build()
+ .newProducer(Schema.STRING).topic(topic).sendTimeout(3,
TimeUnit.SECONDS).create();
+ Assert.assertTrue(producerN.isConnected());
+ producerN.close();
+ } catch (Exception ex) {
+ Assert.fail("failed to create producer");
+ }
+ }
}