This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new cc8895e531d [fix][broker] Fix creating producer failure when set
backlog quota. (#15663)
cc8895e531d is described below
commit cc8895e531d872cdba7dcda2f44a9aa1ecd0290b
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed May 25 05:54:55 2022 -0700
[fix][broker] Fix creating producer failure when set backlog quota. (#15663)
When trying to reproduce the problem of #15609 using the master's code, it
was found that the master also had this bug. The root cause is:
When there is only one ledger in the ManagedLedger, after the current
ledger is closed, it has the timestamp and exceeds the time set by the
backlog-qutoa, resulting in the failure to create the producer.
The added test could reproduce this.
So when there is only one ledger, we should not exclude it.
If revert this patch, the added test will fail.
(cherry picked from commit 3a8045851f7e9ea62da104dab2b7fe2b47a95ca9)
---
.../broker/service/persistent/PersistentTopic.java | 47 +++++++++-----
.../systopic/PartitionedSystemTopicTest.java | 74 +++++++++++++++++++++-
2 files changed, 105 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6746320e642..a583aecc1f2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2597,22 +2597,9 @@ public class PersistentTopic extends AbstractTopic
return false;
}
} else {
- Long ledgerId = ((ManagedCursorContainer)
ledger.getCursors()).getSlowestReaderPosition().getLedgerId();
+ PositionImpl slowestPosition = ((ManagedCursorContainer)
ledger.getCursors()).getSlowestReaderPosition();
try {
-
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
- ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
- if (ledgerInfo != null && ledgerInfo.hasTimestamp() &&
ledgerInfo.getTimestamp() > 0
- && ((ManagedLedgerImpl) ledger).getClock().millis() -
ledgerInfo.getTimestamp()
- > backlogQuotaLimitInSecond * 1000) {
- if (log.isDebugEnabled()) {
- log.debug("Time based backlog quota exceeded, quota
{}, age of ledger "
- + "slowest cursor currently on {}",
backlogQuotaLimitInSecond * 1000,
- ((ManagedLedgerImpl)
ledger).getClock().millis() - ledgerInfo.getTimestamp());
- }
- return true;
- } else {
- return false;
- }
+ return
slowestReaderTimeBasedBacklogQuotaCheck(slowestPosition);
} catch (Exception e) {
log.error("[{}][{}] Error reading entry for precise time based
backlog check", topicName, e);
return false;
@@ -2620,6 +2607,36 @@ public class PersistentTopic extends AbstractTopic
}
}
+ private boolean slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl
slowestPosition)
+ throws ExecutionException, InterruptedException {
+ int backlogQuotaLimitInSecond =
getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
+ Long ledgerId = slowestPosition.getLedgerId();
+ if (((ManagedLedgerImpl)
ledger).getLedgersInfo().lastKey().equals(ledgerId)) {
+ return false;
+ }
+ int result;
+
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
+ ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
+ if (ledgerInfo != null && ledgerInfo.hasTimestamp() &&
ledgerInfo.getTimestamp() > 0
+ && ((ManagedLedgerImpl) ledger).getClock().millis() -
ledgerInfo.getTimestamp()
+ > backlogQuotaLimitInSecond * 1000 && (result =
slowestPosition.compareTo(
+ new PositionImpl(ledgerInfo.getLedgerId(),
ledgerInfo.getEntries() - 1))) <= 0) {
+ if (result < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Time based backlog quota exceeded, quota {},
age of ledger "
+ + "slowest cursor currently on {}",
backlogQuotaLimitInSecond * 1000,
+ ((ManagedLedgerImpl) ledger).getClock().millis() -
ledgerInfo.getTimestamp());
+ }
+ return true;
+ } else {
+ return slowestReaderTimeBasedBacklogQuotaCheck(
+ ((ManagedLedgerImpl)
ledger).getNextValidPosition(slowestPosition));
+ }
+ } else {
+ return false;
+ }
+ }
+
@Override
public boolean isReplicated() {
return !replicators.isEmpty();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 6517e8de34d..7f069356e23 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -19,18 +19,29 @@
package org.apache.pulsar.broker.systopic;
import com.google.common.collect.Sets;
+import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.pulsar.broker.admin.impl.BrokersBase;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.events.EventsTopicNames;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
+import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -38,7 +49,9 @@ import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
@Test(groups = "broker")
public class PartitionedSystemTopicTest extends BrokerTestBase {
@@ -52,6 +65,8 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
conf.setAllowAutoTopicCreation(false);
conf.setAllowAutoTopicCreationType("partitioned");
conf.setDefaultNumPartitions(PARTITIONS);
+ conf.setManagedLedgerMaxEntriesPerLedger(1);
+ conf.setBrokerDeleteInactiveTopicsEnabled(false);
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
@@ -122,4 +137,61 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
Assert.assertEquals(admin.topics().getStats(topicName.toString()).getMsgInCounter(),
1);
Assert.assertEquals(persistentTopic.getManagedLedger().getOffloadedSize(), 0);
}
+
+ @Test
+ private void testSetBacklogCausedCreatingProducerFailure() throws
Exception {
+ final String ns = "prop/ns-test";
+ final String topic = ns + "/topic-1";
+
+ admin.namespaces().createNamespace(ns, 2);
+ admin.topics().createPartitionedTopic(String.format("persistent://%s",
topic), 1);
+ BacklogQuota quota = BacklogQuota.builder()
+ .limitTime(2)
+ .limitSize(-1)
+
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+ .build();
+ admin.namespaces().setBacklogQuota(ns, quota,
BacklogQuota.BacklogQuotaType.message_age);
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ String partition0 = TopicName.get(String.format("persistent://%s",
topic)).getPartition(0).toString();
+ Optional<Topic> topicReference =
pulsar.getBrokerService().getTopicReference(partition0);
+ Assert.assertTrue(topicReference.isPresent());
+ PersistentTopic persistentTopic = (PersistentTopic)
topicReference.get();
+ ManagedLedgerConfig config =
persistentTopic.getManagedLedger().getConfig();
+ config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
+ config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
+ persistentTopic.getManagedLedger().setConfig(config);
+ Whitebox.invokeMethod(persistentTopic.getManagedLedger(),
"updateLastLedgerCreatedTimeAndScheduleRolloverTask");
+ String msg1 = "msg-1";
+ producer.send(msg1);
+ Thread.sleep(3 * 1000);
+
+ Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub-1")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ Message<String> receive = consumer2.receive();
+ consumer2.acknowledge(receive);
+
+ Thread.sleep(3 * 1000);
+
+ try {
+ Producer<String> producerN = PulsarClient.builder()
+ .maxBackoffInterval(3, TimeUnit.SECONDS)
+ .operationTimeout(5, TimeUnit.SECONDS)
+ .serviceUrl(lookupUrl.toString()).connectionTimeout(2,
TimeUnit.SECONDS).build()
+ .newProducer(Schema.STRING).topic(topic).sendTimeout(3,
TimeUnit.SECONDS).create();
+ Assert.assertTrue(producerN.isConnected());
+ producerN.close();
+ } catch (Exception ex) {
+ Assert.fail("failed to create producer");
+ }
+ }
}