This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new dc7d512dae0 [fix][broker] Fix creating producer failure when set 
backlog quota. (#15663)
dc7d512dae0 is described below

commit dc7d512dae02f1e263292673ab7d7656cd6ea8d2
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed May 25 05:54:55 2022 -0700

    [fix][broker] Fix creating producer failure when set backlog quota. (#15663)
    
    When trying to reproduce the problem of #15609 using the master's code, it 
was found that the master also had this bug. The root cause is:
    When there is only one ledger in the ManagedLedger, after the current 
ledger is closed, it has the timestamp and exceeds the time set by the 
backlog-qutoa, resulting in the failure to create the producer.
    
    The added test could reproduce this.
    
    So when there is only one ledger, we should not exclude it.
    
    If revert this patch, the added test will fail.
    
    (cherry picked from commit 3a8045851f7e9ea62da104dab2b7fe2b47a95ca9)
---
 .../broker/service/persistent/PersistentTopic.java | 47 +++++++++++-----
 .../systopic/PartitionedSystemTopicTest.java       | 64 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index acb965f0429..c2d5522f502 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2558,22 +2558,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     }, null);
             return future;
         } else {
-            Long ledgerId = ((ManagedCursorContainer) 
ledger.getCursors()).getSlowestReaderPosition().getLedgerId();
+            PositionImpl slowestPosition = ((ManagedCursorContainer) 
ledger.getCursors()).getSlowestReaderPosition();
             try {
-                
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
-                        ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
-                if (ledgerInfo != null && ledgerInfo.hasTimestamp() && 
ledgerInfo.getTimestamp() > 0
-                        && ((ManagedLedgerImpl) ledger).getClock().millis() - 
ledgerInfo.getTimestamp()
-                        > backlogQuotaLimitInSecond * 1000) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Time based backlog quota exceeded, quota 
{}, age of ledger "
-                                        + "slowest cursor currently on {}", 
backlogQuotaLimitInSecond * 1000,
-                                ((ManagedLedgerImpl) 
ledger).getClock().millis() - ledgerInfo.getTimestamp());
-                    }
-                    return CompletableFuture.completedFuture(true);
-                } else {
-                    return CompletableFuture.completedFuture(false);
-                }
+                return 
slowestReaderTimeBasedBacklogQuotaCheck(slowestPosition);
             } catch (Exception e) {
                 log.error("[{}][{}] Error reading entry for precise time based 
backlog check", topicName, e);
                 return CompletableFuture.completedFuture(false);
@@ -2581,6 +2568,36 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }
     }
 
+    private CompletableFuture<Boolean> 
slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl slowestPosition)
+            throws ExecutionException, InterruptedException {
+        int backlogQuotaLimitInSecond = 
getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
+        Long ledgerId = slowestPosition.getLedgerId();
+        if (((ManagedLedgerImpl) 
ledger).getLedgersInfo().lastKey().equals(ledgerId)) {
+            return CompletableFuture.completedFuture(false);
+        }
+        int result;
+        
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
+                ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
+        if (ledgerInfo != null && ledgerInfo.hasTimestamp() && 
ledgerInfo.getTimestamp() > 0
+                && ((ManagedLedgerImpl) ledger).getClock().millis() - 
ledgerInfo.getTimestamp()
+                > backlogQuotaLimitInSecond * 1000 && (result = 
slowestPosition.compareTo(
+                new PositionImpl(ledgerInfo.getLedgerId(), 
ledgerInfo.getEntries() - 1))) <= 0) {
+            if (result < 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Time based backlog quota exceeded, quota {}, 
age of ledger "
+                                    + "slowest cursor currently on {}", 
backlogQuotaLimitInSecond * 1000,
+                            ((ManagedLedgerImpl) ledger).getClock().millis() - 
ledgerInfo.getTimestamp());
+                }
+                return CompletableFuture.completedFuture(true);
+            } else {
+                return slowestReaderTimeBasedBacklogQuotaCheck(
+                        ((ManagedLedgerImpl) 
ledger).getNextValidPosition(slowestPosition));
+            }
+        } else {
+            return CompletableFuture.completedFuture(false);
+        }
+    }
+
     @Override
     public boolean isReplicated() {
         return !replicators.isEmpty();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index d506f712767..cf45d614f0f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -27,14 +27,18 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pulsar.broker.admin.impl.BrokersBase;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.naming.TopicName;
@@ -43,6 +47,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -50,6 +55,7 @@ import org.testng.annotations.Test;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -65,6 +71,8 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         conf.setAllowAutoTopicCreation(false);
         conf.setAllowAutoTopicCreationType("partitioned");
         conf.setDefaultNumPartitions(PARTITIONS);
+        conf.setManagedLedgerMaxEntriesPerLedger(1);
+        conf.setBrokerDeleteInactiveTopicsEnabled(false);
 
         conf.setSystemTopicEnabled(true);
         conf.setTopicLevelPoliciesEnabled(true);
@@ -170,4 +178,60 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         });
     }
 
+    @Test
+    private void testSetBacklogCausedCreatingProducerFailure() throws 
Exception {
+        final String ns = "prop/ns-test";
+        final String topic = ns + "/topic-1";
+
+        admin.namespaces().createNamespace(ns, 2);
+        admin.topics().createPartitionedTopic(String.format("persistent://%s", 
topic), 1);
+        BacklogQuota quota = BacklogQuota.builder()
+                .limitTime(2)
+                .limitSize(-1)
+                
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                .build();
+        admin.namespaces().setBacklogQuota(ns, quota, 
BacklogQuota.BacklogQuotaType.message_age);
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        String partition0 = TopicName.get(String.format("persistent://%s", 
topic)).getPartition(0).toString();
+        Optional<Topic> topicReference = 
pulsar.getBrokerService().getTopicReference(partition0);
+        Assert.assertTrue(topicReference.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) 
topicReference.get();
+        ManagedLedgerConfig config = 
persistentTopic.getManagedLedger().getConfig();
+        config.setMinimumRolloverTime(1, TimeUnit.SECONDS);
+        config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
+        persistentTopic.getManagedLedger().setConfig(config);
+        Whitebox.invokeMethod(persistentTopic.getManagedLedger(), 
"updateLastLedgerCreatedTimeAndScheduleRolloverTask");
+        String msg1 = "msg-1";
+        producer.send(msg1);
+        Thread.sleep(3 * 1000);
+
+        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub-1")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        Message<String> receive = consumer2.receive();
+        consumer2.acknowledge(receive);
+
+        Thread.sleep(3 * 1000);
+
+        try {
+            Producer<String> producerN = PulsarClient.builder()
+                    .maxBackoffInterval(3, TimeUnit.SECONDS)
+                    .operationTimeout(5, TimeUnit.SECONDS)
+                    .serviceUrl(lookupUrl.toString()).connectionTimeout(2, 
TimeUnit.SECONDS).build()
+                    .newProducer(Schema.STRING).topic(topic).sendTimeout(3, 
TimeUnit.SECONDS).create();
+            Assert.assertTrue(producerN.isConnected());
+            producerN.close();
+        } catch (Exception ex) {
+            Assert.fail("failed to create producer");
+        }
+    }
 }

Reply via email to