This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 30073dbac0e [fix] [broker] Producer is blocked on creation because 
backlog exceeded on topic, when dedup is enabled and no producer is there 
(#20951)
30073dbac0e is described below

commit 30073dbac0e941869b43e090d2682935e8f094e5
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Aug 23 18:03:29 2023 -0700

    [fix] [broker] Producer is blocked on creation because backlog exceeded on 
topic, when dedup is enabled and no producer is there (#20951)
---
 .../pulsar/broker/service/BacklogQuotaManager.java | 28 ++++++++++++++-
 .../service/persistent/MessageDeduplication.java   | 11 ++++++
 .../broker/service/persistent/PersistentTopic.java |  4 +++
 .../broker/service/BacklogQuotaManagerTest.java    | 40 ++++++++++++++++++----
 4 files changed, 76 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index bc2541c802e..6ad1697adfc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -103,7 +103,10 @@ public class BacklogQuotaManager {
             break;
         case producer_exception:
         case producer_request_hold:
-            disconnectProducers(persistentTopic);
+            if (!advanceSlowestSystemCursor(persistentTopic)) {
+                // The slowest is not a system cursor. Disconnecting producers 
to put backpressure.
+                disconnectProducers(persistentTopic);
+            }
             break;
         default:
             break;
@@ -268,4 +271,27 @@ public class BacklogQuotaManager {
 
         });
     }
+
+    /**
+     * Advances the slowest cursor if that is a system cursor.
+     *
+     * @param persistentTopic
+     * @return true if the slowest cursor is a system cursor
+     */
+    private boolean advanceSlowestSystemCursor(PersistentTopic 
persistentTopic) {
+
+        ManagedLedgerImpl mLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
+        if (slowestConsumer == null) {
+            return false;
+        }
+
+        if (PersistentTopic.isDedupCursorName(slowestConsumer.getName())) {
+            persistentTopic.getMessageDeduplication().takeSnapshot();
+            return true;
+        }
+
+        // We may need to check other system cursors here : replicator, 
compaction
+        return false;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 490be4a8876..d1b4b74945f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -27,6 +27,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
@@ -130,6 +131,9 @@ public class MessageDeduplication {
 
     private final String replicatorPrefix;
 
+
+    private final AtomicBoolean snapshotTaking = new AtomicBoolean(false);
+
     public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, 
ManagedLedger managedLedger) {
         this.pulsar = pulsar;
         this.topic = topic;
@@ -406,6 +410,11 @@ public class MessageDeduplication {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Taking snapshot of sequence ids map", 
topic.getName());
         }
+
+        if (!snapshotTaking.compareAndSet(false, true)) {
+            return;
+        }
+
         Map<String, Long> snapshot = new TreeMap<>();
         highestSequencedPersisted.forEach((producerName, sequenceId) -> {
             if (snapshot.size() < maxNumberOfProducers) {
@@ -420,11 +429,13 @@ public class MessageDeduplication {
                     log.debug("[{}] Stored new deduplication snapshot at {}", 
topic.getName(), position);
                 }
                 lastSnapshotTimestamp = System.currentTimeMillis();
+                snapshotTaking.set(false);
             }
 
             @Override
             public void markDeleteFailed(ManagedLedgerException exception, 
Object ctx) {
                 log.warn("[{}] Failed to store new deduplication snapshot at 
{}", topic.getName(), position);
+                snapshotTaking.set(false);
             }
         }, null);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f5679665d46..abfa37a67bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -196,6 +196,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     private final TopicName shadowSourceTopic;
 
     static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
+
+    public static boolean isDedupCursorName(String name) {
+        return DEDUPLICATION_CURSOR_NAME.equals(name);
+    }
     private static final String TOPIC_EPOCH_PROPERTY_NAME = 
"pulsar.topic.epoch";
 
     private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index f3463ee121d..0ac5fdaef15 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -127,8 +127,8 @@ public class BacklogQuotaManagerTest {
             config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
             config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
             config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
-            config.setSystemTopicEnabled(false);
-            config.setTopicLevelPoliciesEnabled(false);
+            config.setSystemTopicEnabled(true);
+            config.setTopicLevelPoliciesEnabled(true);
             config.setForceDeleteNamespaceAllowed(true);
 
             pulsar = new PulsarService(config);
@@ -1169,8 +1169,13 @@ public class BacklogQuotaManagerTest {
         assertTrue(gotException, "backlog exceeded exception did not occur");
     }
 
-    @Test
-    public void testProducerExceptionAndThenUnblockSizeQuota() throws 
Exception {
+    @DataProvider(name = "dedupTestSet")
+    public static Object[][] dedupTestSet() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    @Test(dataProvider = "dedupTestSet")
+    public void testProducerExceptionAndThenUnblockSizeQuota(boolean 
dedupTestSet) throws Exception {
         assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
                 new HashMap<>());
         admin.namespaces().setBacklogQuota("prop/quotahold",
@@ -1186,9 +1191,12 @@ public class BacklogQuotaManagerTest {
         boolean gotException = false;
 
         Consumer<byte[]> consumer = 
client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
-
         byte[] content = new byte[1024];
         Producer<byte[]> producer = createProducer(client, topic1);
+
+        admin.topicPolicies().setDeduplicationStatus(topic1, dedupTestSet);
+        Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
+
         for (int i = 0; i < 10; i++) {
             producer.send(content);
         }
@@ -1207,6 +1215,7 @@ public class BacklogQuotaManagerTest {
         }
 
         assertTrue(gotException, "backlog exceeded exception did not occur");
+        assertFalse(producer.isConnected());
         // now remove backlog and ensure that producer is unblocked;
 
         TopicStats stats = getTopicStats(topic1);
@@ -1223,14 +1232,33 @@ public class BacklogQuotaManagerTest {
         Exception sendException = null;
         gotException = false;
         try {
-            for (int i = 0; i < 5; i++) {
+            for (int i = 0; i < 10; i++) {
                 producer.send(content);
+                Message<?> msg = consumer.receive();
+                consumer.acknowledge(msg);
             }
         } catch (Exception e) {
             gotException = true;
             sendException = e;
         }
+        Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
         assertFalse(gotException, "unable to publish due to " + sendException);
+
+        gotException = false;
+        long lastDisconnectedTimestamp = 
producer.getLastDisconnectedTimestamp();
+        try {
+            // try to send over backlog quota and make sure it passes
+            producer.send(content);
+            producer.send(content);
+        } catch (PulsarClientException ce) {
+            assertTrue(ce instanceof 
PulsarClientException.ProducerBlockedQuotaExceededException
+                    || ce instanceof PulsarClientException.TimeoutException, 
ce.getMessage());
+            gotException = true;
+            sendException = ce;
+        }
+        assertFalse(gotException, "unable to publish due to " + sendException);
+        assertEquals(lastDisconnectedTimestamp, 
producer.getLastDisconnectedTimestamp());
+
     }
 
     @Test

Reply via email to