This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dc2557c7501544e28e245f34f8b5bc258a2a3fd6
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Nov 11 12:26:06 2024 -0800

    [fix][broker] Broker is failing to create non-durable sub if topic is 
fenced (#23579)
    
    (cherry picked from commit 7822dca1ffe45324d7af7ef830c617f6881a5431)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 ++-
 .../broker/service/persistent/PersistentTopic.java |  5 ++++
 .../client/api/SimpleProducerConsumerTest.java     | 27 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 88cb6bd8f46..a5478dba432 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3937,7 +3937,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
-    synchronized void setFenced() {
+    @VisibleForTesting
+    public synchronized void setFenced() {
         log.info("{} Moving to Fenced state", name);
         STATE_UPDATER.set(this, State.Fenced);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 22f99c4579a..0e6e0681570 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1093,6 +1093,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 } else if (ex.getCause() instanceof 
BrokerServiceException.SubscriptionFencedException
                         && isCompactionSubscription(subscriptionName)) {
                     log.warn("[{}] Failed to create compaction subscription: 
{}", topic, ex.getMessage());
+                } else if (ex.getCause() instanceof 
ManagedLedgerFencedException) {
+                    // If the topic has been fenced, we cannot continue using 
it. We need to close and reopen
+                    log.warn("[{}][{}] has been fenced. closing the topic {}", 
topic, subscriptionName,
+                            ex.getMessage());
+                    close();
                 } else {
                     log.error("[{}] Failed to create subscription: {}", topic, 
subscriptionName, ex);
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 3edf2dd42aa..2dbe1682c1d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4871,4 +4871,31 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             return 0;
         }
     }
+
+    @Test
+    public void testFencedLedger() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String topic = "persistent://my-property/my-ns/fencedLedger";
+
+        @Cleanup
+        PulsarClient newPulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
+
+        @Cleanup
+        Producer<byte[]> producer = 
newPulsarClient.newProducer().topic(topic).enableBatching(false).create();
+
+        final int numMessages = 5;
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage().value(("value-" + 
i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync();
+        }
+        producer.flush();
+
+        PersistentTopic pTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) pTopic.getManagedLedger();
+        ml.setFenced();
+
+        Reader<byte[]> reader = 
newPulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest)
+                .createAsync().get(5, TimeUnit.SECONDS);
+        assertNotNull(reader);
+    }
 }

Reply via email to