This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 438fd83897b1e85ff4d3ca065c93184bd5a32c5b Author: fengyubiao <[email protected]> AuthorDate: Mon Dec 15 21:17:01 2025 +0800 [fix][broker]Infinitely failed to delete topic if the first time failed and enabled transaction (#25073) (cherry picked from commit c572e82c7a06afefad0c5e8aed8885b16fcdd91a) --- .../broker/service/persistent/PersistentTopic.java | 2 +- .../transaction/buffer/TransactionBuffer.java | 5 ++++ .../buffer/impl/InMemTransactionBuffer.java | 5 ++++ .../buffer/impl/TopicTransactionBuffer.java | 11 ++++++++ .../buffer/impl/TopicTransactionBufferState.java | 11 +++++++- .../buffer/impl/TransactionBufferDisable.java | 5 ++++ .../broker/admin/v3/AdminApiTransactionTest.java | 30 ++++++++++++++++++++++ .../bookkeeper/client/PulsarMockBookKeeper.java | 4 ++- 8 files changed, 70 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e7c075243ef..e7d0ddeac2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4795,7 +4795,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } private CompletableFuture<Void> transactionBufferCleanupAndClose() { - return transactionBuffer.clearSnapshot().thenCompose(__ -> transactionBuffer.closeAsync()); + return transactionBuffer.clearSnapshotAndClose(); } public Optional<TopicName> getShadowSourceTopic() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java index 874f4c1c28a..886f58fdc18 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java @@ -127,6 +127,11 @@ public interface TransactionBuffer { */ CompletableFuture<Void> clearSnapshot(); + /** + * Clear up the snapshot of the TransactionBuffer and close it. + */ + CompletableFuture<Void> clearSnapshotAndClose(); + /** * Close the buffer asynchronously. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index 55310537b00..d0737000212 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -364,6 +364,11 @@ public class InMemTransactionBuffer implements TransactionBuffer { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture<Void> clearSnapshotAndClose() { + return clearSnapshot().thenCompose(__ -> closeAsync()); + } + @Override public CompletableFuture<Void> closeAsync() { buffers.values().forEach(TxnBuffer::close); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 5c3fefacb4e..77d28a61654 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -634,6 +634,17 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen return snapshotAbortedTxnProcessor.clearAbortedTxnSnapshot(); } + @Override + public CompletableFuture<Void> clearSnapshotAndClose() { + if (checkIfClosedAndCleared()) { + return CompletableFuture.completedFuture(null); + } + return snapshotAbortedTxnProcessor.clearAbortedTxnSnapshot().thenCompose(__ -> closeAsync()) + .thenAccept(__ -> { + changeToClosedAndClearedState(); + }); + } + @Override public CompletableFuture<Void> closeAsync() { synchronized (pendingAppendingTxnBufferTasks) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java index 9a8f2041bf4..abfb58eb658 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java @@ -33,6 +33,7 @@ public abstract class TopicTransactionBufferState { Initializing, Ready, Close, + ClosedAndCleared, NoSnapshot, FirstSnapshotting } @@ -71,6 +72,10 @@ public abstract class TopicTransactionBufferState { STATE_UPDATER.set(this, State.Close); } + protected void changeToClosedAndClearedState() { + STATE_UPDATER.compareAndSet(this, State.Close, State.ClosedAndCleared); + } + public boolean checkIfInitializing() { return STATE_UPDATER.get(this) == State.Initializing; } @@ -88,7 +93,11 @@ public abstract class TopicTransactionBufferState { } public boolean checkIfClosed() { - return STATE_UPDATER.get(this) == State.Close; + return STATE_UPDATER.get(this) == State.Close || STATE_UPDATER.get(this) == State.ClosedAndCleared; + } + + public boolean checkIfClosedAndCleared() { + return STATE_UPDATER.get(this) == State.ClosedAndCleared; } public State getState() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index f6e2ad04e50..7d59ab5dd14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -87,6 +87,11 @@ public class TransactionBufferDisable implements TransactionBuffer { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture<Void> clearSnapshotAndClose() { + return clearSnapshot().thenCompose(__ -> closeAsync()); + } + @Override public CompletableFuture<Void> closeAsync() { return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 3d1c80ba697..b567fa21ef8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -80,11 +80,13 @@ import org.apache.pulsar.common.policies.data.TransactionMetadata; import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; import org.apache.pulsar.common.stats.PositionInPendingAckStats; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; import org.apache.pulsar.transaction.coordinator.TxnMeta; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.apache.pulsar.transaction.coordinator.proto.TxnStatus; +import org.apache.zookeeper.KeeperException; import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -1111,4 +1113,32 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest { assertEquals(persistentTopicStats.ledgers.size(), internalStats.ledgers.size()); assertEquals(persistentTopicStats.cursors.size(), internalStats.cursors.size()); } + + @Test + public void testRetryDeleteTopicAfterFailedDeleteLedger() throws Exception { + // Create a topic. + final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(tpName); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(tpName).create(); + producer.send("1"); + producer.close(); + // The first deleting should fail, since we injected an error. + pulsarTestContext.getMockZooKeeper().failConditional(KeeperException.Code.BADVERSION, (op, path) -> { + if ("DELETE".equals(op.toString()) + && path.endsWith(TopicName.get(tpName).getPersistenceNamingEncoding())) { + return true; + } + return false; + }); + try { + admin.topics().delete(tpName); + fail("The deleting should fail because we injected an error"); + } catch (Throwable ex) { + // expected + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + assertTrue(actEx.getMessage().contains("BadVersionException")); + } + // The second deleting should succeed. + admin.topics().delete(tpName); + } } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 64fe3796709..8848369f302 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -350,7 +350,9 @@ public class PulsarMockBookKeeper extends BookKeeper { failures.add(delayFuture); } - + /** + * @param rc see also {@link org.apache.bookkeeper.client.BKException.Code}. + */ public void failNow(int rc) { failAfter(0, rc); }
