This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7822dca1ffe [fix][broker] Broker is failing to create non-durable sub
if topic is fenced (#23579)
7822dca1ffe is described below
commit 7822dca1ffe45324d7af7ef830c617f6881a5431
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Nov 11 12:26:06 2024 -0800
[fix][broker] Broker is failing to create non-durable sub if topic is
fenced (#23579)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 ++-
.../broker/service/persistent/PersistentTopic.java | 5 ++++
.../client/api/SimpleProducerConsumerTest.java | 27 ++++++++++++++++++++++
3 files changed, 34 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c1081761b60..926e7c7f810 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3930,7 +3930,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
- synchronized void setFenced() {
+ @VisibleForTesting
+ public synchronized void setFenced() {
log.info("{} Moving to Fenced state", name);
STATE_UPDATER.set(this, State.Fenced);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 541c8a7a225..651d1237362 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1050,6 +1050,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
} else if (ex.getCause() instanceof
BrokerServiceException.SubscriptionFencedException
&& isCompactionSubscription(subscriptionName)) {
log.warn("[{}] Failed to create compaction subscription:
{}", topic, ex.getMessage());
+ } else if (ex.getCause() instanceof
ManagedLedgerFencedException) {
+ // If the topic has been fenced, we cannot continue using
it. We need to close and reopen
+ log.warn("[{}][{}] has been fenced. closing the topic {}",
topic, subscriptionName,
+ ex.getMessage());
+ close();
} else {
log.error("[{}] Failed to create subscription: {}", topic,
subscriptionName, ex);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index e99abac1ec4..78d28e4b228 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4979,4 +4979,31 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
return 0;
}
}
+
+ @Test
+ public void testFencedLedger() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String topic = "persistent://my-property/my-ns/fencedLedger";
+
+ @Cleanup
+ PulsarClient newPulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
+
+ @Cleanup
+ Producer<byte[]> producer =
newPulsarClient.newProducer().topic(topic).enableBatching(false).create();
+
+ final int numMessages = 5;
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().value(("value-" +
i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync();
+ }
+ producer.flush();
+
+ PersistentTopic pTopic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topic).get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) pTopic.getManagedLedger();
+ ml.setFenced();
+
+ Reader<byte[]> reader =
newPulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest)
+ .createAsync().get(5, TimeUnit.SECONDS);
+ assertNotNull(reader);
+ }
}