This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dc2557c7501544e28e245f34f8b5bc258a2a3fd6 Author: Rajan Dhabalia <[email protected]> AuthorDate: Mon Nov 11 12:26:06 2024 -0800 [fix][broker] Broker is failing to create non-durable sub if topic is fenced (#23579) (cherry picked from commit 7822dca1ffe45324d7af7ef830c617f6881a5431) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 ++- .../broker/service/persistent/PersistentTopic.java | 5 ++++ .../client/api/SimpleProducerConsumerTest.java | 27 ++++++++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 88cb6bd8f46..a5478dba432 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3937,7 +3937,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } - synchronized void setFenced() { + @VisibleForTesting + public synchronized void setFenced() { log.info("{} Moving to Fenced state", name); STATE_UPDATER.set(this, State.Fenced); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 22f99c4579a..0e6e0681570 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1093,6 +1093,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else if (ex.getCause() instanceof BrokerServiceException.SubscriptionFencedException && isCompactionSubscription(subscriptionName)) { log.warn("[{}] Failed to create compaction subscription: {}", topic, ex.getMessage()); + } else if (ex.getCause() instanceof ManagedLedgerFencedException) { + // If the topic has been fenced, we cannot continue using it. We need to close and reopen + log.warn("[{}][{}] has been fenced. closing the topic {}", topic, subscriptionName, + ex.getMessage()); + close(); } else { log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 3edf2dd42aa..2dbe1682c1d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4871,4 +4871,31 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { return 0; } } + + @Test + public void testFencedLedger() throws Exception { + log.info("-- Starting {} test --", methodName); + + final String topic = "persistent://my-property/my-ns/fencedLedger"; + + @Cleanup + PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build(); + + @Cleanup + Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(false).create(); + + final int numMessages = 5; + for (int i = 0; i < numMessages; i++) { + producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync(); + } + producer.flush(); + + PersistentTopic pTopic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) pTopic.getManagedLedger(); + ml.setFenced(); + + Reader<byte[]> reader = newPulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest) + .createAsync().get(5, TimeUnit.SECONDS); + assertNotNull(reader); + } }
