This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 662edf54eac595b23beec0e92e2167cdd5113fac Author: fengyubiao <[email protected]> AuthorDate: Wed Jan 7 12:41:43 2026 +0800 [fix][broker]Topic deleting failed after removed local cluster from namespace policies (#25114) (cherry picked from commit b4b05b530dc623f128e744b225908141a4986cda) --- .../broker/service/persistent/PersistentTopic.java | 43 ++++++++++------------ .../service/OneWayReplicatorUsingGlobalZKTest.java | 1 + 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 73a773dd741..451471e215e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1276,34 +1276,29 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal .getTransactionPendingAckStoreSuffix(topic, Codec.encode(subscriptionName))); if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) { - CompletableFuture<ManagedLedgerConfig> managedLedgerConfig = getBrokerService().getManagedLedgerConfig(tn); - managedLedgerConfig.thenAccept(config -> { - ManagedLedgerFactory managedLedgerFactory = - getBrokerService().getManagedLedgerFactoryForTopic(tn, config.getStorageClassName()); + ManagedLedgerConfig managedLedgerConfig = ledger.getConfig(); + ManagedLedgerFactory managedLedgerFactory = getBrokerService() + .getManagedLedgerFactoryForTopic(tn, managedLedgerConfig.getStorageClassName()); managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), - managedLedgerConfig, - new AsyncCallbacks.DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { + CompletableFuture.completedFuture(managedLedgerConfig), + new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture); + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + if (exception instanceof MetadataNotFoundException) { asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture); + return; } - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { - if (exception instanceof MetadataNotFoundException) { - asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture); - return; - } - - unsubscribeFuture.completeExceptionally(exception); - log.error("[{}][{}] Error deleting subscription pending ack store", - topic, subscriptionName, exception); - } - }, null); - }).exceptionally(ex -> { - unsubscribeFuture.completeExceptionally(ex); - return null; - }); + unsubscribeFuture.completeExceptionally(exception); + log.error("[{}][{}] Error deleting subscription pending ack store", + topic, subscriptionName, exception); + } + }, null); } else { asyncDeleteCursorWithClearDelayedMessage(subscriptionName, unsubscribeFuture); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 60672845b5e..db16963f208 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -495,6 +495,7 @@ public class OneWayReplicatorUsingGlobalZKTest extends OneWayReplicatorTest { admin1.namespaces().createNamespace(ns1); admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); admin1.topics().createNonPartitionedTopic(topic); + admin1.topics().createSubscription(topic, "s1", MessageId.earliest); // Wait for loading topic up. Producer<String> p = client1.newProducer(Schema.STRING).topic(topic).create();
