This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 3115a4f69e2 [fix][broker]Infinitely failed to delete topic if the 
first time failed and enabled transaction (#25073)
3115a4f69e2 is described below

commit 3115a4f69e2b1bc3aede91a07b2461cf537629d1
Author: fengyubiao <[email protected]>
AuthorDate: Mon Dec 15 21:17:01 2025 +0800

    [fix][broker]Infinitely failed to delete topic if the first time failed and 
enabled transaction (#25073)
---
 .../broker/service/persistent/PersistentTopic.java |  2 +-
 .../transaction/buffer/TransactionBuffer.java      |  5 ++++
 .../buffer/impl/InMemTransactionBuffer.java        |  5 ++++
 .../buffer/impl/TopicTransactionBuffer.java        | 11 ++++++++
 .../buffer/impl/TopicTransactionBufferState.java   | 11 +++++++-
 .../buffer/impl/TransactionBufferDisable.java      |  5 ++++
 .../broker/admin/v3/AdminApiTransactionTest.java   | 30 ++++++++++++++++++++++
 .../bookkeeper/client/PulsarMockBookKeeper.java    |  4 ++-
 8 files changed, 70 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 422916602c8..22f3be27d58 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4826,7 +4826,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     }
 
     private CompletableFuture<Void> transactionBufferCleanupAndClose() {
-        return transactionBuffer.clearSnapshot().thenCompose(__ -> 
transactionBuffer.closeAsync());
+        return transactionBuffer.clearSnapshotAndClose();
     }
 
     public Optional<TopicName> getShadowSourceTopic() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index 874f4c1c28a..886f58fdc18 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -127,6 +127,11 @@ public interface TransactionBuffer {
      */
     CompletableFuture<Void> clearSnapshot();
 
+    /**
+     * Clear up the snapshot of the TransactionBuffer and close it.
+     */
+    CompletableFuture<Void> clearSnapshotAndClose();
+
     /**
      * Close the buffer asynchronously.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 55310537b00..d0737000212 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -364,6 +364,11 @@ public class InMemTransactionBuffer implements 
TransactionBuffer {
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public CompletableFuture<Void> clearSnapshotAndClose() {
+        return clearSnapshot().thenCompose(__ -> closeAsync());
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         buffers.values().forEach(TxnBuffer::close);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 2df6e717981..7ae311a5463 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -637,6 +637,17 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         return snapshotAbortedTxnProcessor.clearAbortedTxnSnapshot();
     }
 
+    @Override
+    public CompletableFuture<Void> clearSnapshotAndClose() {
+        if (checkIfClosedAndCleared()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return 
snapshotAbortedTxnProcessor.clearAbortedTxnSnapshot().thenCompose(__ -> 
closeAsync())
+            .thenAccept(__ -> {
+                changeToClosedAndClearedState();
+            });
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         synchronized (pendingAppendingTxnBufferTasks) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
index 9a8f2041bf4..abfb58eb658 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
@@ -33,6 +33,7 @@ public abstract class TopicTransactionBufferState {
         Initializing,
         Ready,
         Close,
+        ClosedAndCleared,
         NoSnapshot,
         FirstSnapshotting
     }
@@ -71,6 +72,10 @@ public abstract class TopicTransactionBufferState {
         STATE_UPDATER.set(this, State.Close);
     }
 
+    protected void changeToClosedAndClearedState() {
+        STATE_UPDATER.compareAndSet(this, State.Close, State.ClosedAndCleared);
+    }
+
     public boolean checkIfInitializing() {
         return STATE_UPDATER.get(this) == State.Initializing;
     }
@@ -88,7 +93,11 @@ public abstract class TopicTransactionBufferState {
     }
 
     public boolean checkIfClosed() {
-        return STATE_UPDATER.get(this) == State.Close;
+        return STATE_UPDATER.get(this) == State.Close || 
STATE_UPDATER.get(this) == State.ClosedAndCleared;
+    }
+
+    public boolean checkIfClosedAndCleared() {
+        return STATE_UPDATER.get(this) == State.ClosedAndCleared;
     }
 
     public State getState() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index f6e2ad04e50..7d59ab5dd14 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -87,6 +87,11 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public CompletableFuture<Void> clearSnapshotAndClose() {
+        return clearSnapshot().thenCompose(__ -> closeAsync());
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index e567d1d5281..b3c4b804286 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -80,11 +80,13 @@ import 
org.apache.pulsar.common.policies.data.TransactionMetadata;
 import 
org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
 import org.apache.pulsar.common.stats.PositionInPendingAckStats;
+import org.apache.pulsar.common.util.FutureUtil;
 import 
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+import org.apache.zookeeper.KeeperException;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -1110,4 +1112,32 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(persistentTopicStats.ledgers.size(), 
internalStats.ledgers.size());
         assertEquals(persistentTopicStats.cursors.size(), 
internalStats.cursors.size());
     }
+
+    @Test
+    public void testRetryDeleteTopicAfterFailedDeleteLedger() throws Exception 
{
+        // Create a topic.
+        final String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(tpName);
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(tpName).create();
+        producer.send("1");
+        producer.close();
+        // The first deleting should fail, since we injected an error.
+        
pulsarTestContext.getMockZooKeeper().failConditional(KeeperException.Code.BADVERSION,
 (op, path) -> {
+            if ("DELETE".equals(op.toString())
+                    && 
path.endsWith(TopicName.get(tpName).getPersistenceNamingEncoding())) {
+                return true;
+            }
+            return false;
+        });
+        try {
+            admin.topics().delete(tpName);
+            fail("The deleting should fail because we injected an error");
+        } catch (Throwable ex) {
+            // expected
+            Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+            assertTrue(actEx.getMessage().contains("BadVersionException"));
+        }
+        // The second deleting should succeed.
+        admin.topics().delete(tpName);
+    }
 }
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 9a195159377..360d82e0791 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -354,7 +354,9 @@ public class PulsarMockBookKeeper extends BookKeeper {
         failures.add(delayFuture);
     }
 
-
+    /**
+     * @param rc see also {@link 
org.apache.bookkeeper.client.BKException.Code}.
+     */
     public void failNow(int rc) {
         failAfter(0, rc);
     }

Reply via email to