This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 3115a4f69e2 [fix][broker]Infinitely failed to delete topic if the
first time failed and enabled transaction (#25073)
3115a4f69e2 is described below
commit 3115a4f69e2b1bc3aede91a07b2461cf537629d1
Author: fengyubiao <[email protected]>
AuthorDate: Mon Dec 15 21:17:01 2025 +0800
[fix][broker]Infinitely failed to delete topic if the first time failed and
enabled transaction (#25073)
---
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../transaction/buffer/TransactionBuffer.java | 5 ++++
.../buffer/impl/InMemTransactionBuffer.java | 5 ++++
.../buffer/impl/TopicTransactionBuffer.java | 11 ++++++++
.../buffer/impl/TopicTransactionBufferState.java | 11 +++++++-
.../buffer/impl/TransactionBufferDisable.java | 5 ++++
.../broker/admin/v3/AdminApiTransactionTest.java | 30 ++++++++++++++++++++++
.../bookkeeper/client/PulsarMockBookKeeper.java | 4 ++-
8 files changed, 70 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 422916602c8..22f3be27d58 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4826,7 +4826,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
private CompletableFuture<Void> transactionBufferCleanupAndClose() {
- return transactionBuffer.clearSnapshot().thenCompose(__ ->
transactionBuffer.closeAsync());
+ return transactionBuffer.clearSnapshotAndClose();
}
public Optional<TopicName> getShadowSourceTopic() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index 874f4c1c28a..886f58fdc18 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -127,6 +127,11 @@ public interface TransactionBuffer {
*/
CompletableFuture<Void> clearSnapshot();
+ /**
+ * Clear up the snapshot of the TransactionBuffer and close it.
+ */
+ CompletableFuture<Void> clearSnapshotAndClose();
+
/**
* Close the buffer asynchronously.
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 55310537b00..d0737000212 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -364,6 +364,11 @@ public class InMemTransactionBuffer implements
TransactionBuffer {
return CompletableFuture.completedFuture(null);
}
+ @Override
+ public CompletableFuture<Void> clearSnapshotAndClose() {
+ return clearSnapshot().thenCompose(__ -> closeAsync());
+ }
+
@Override
public CompletableFuture<Void> closeAsync() {
buffers.values().forEach(TxnBuffer::close);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 2df6e717981..7ae311a5463 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -637,6 +637,17 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
return snapshotAbortedTxnProcessor.clearAbortedTxnSnapshot();
}
+ @Override
+ public CompletableFuture<Void> clearSnapshotAndClose() {
+ if (checkIfClosedAndCleared()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return
snapshotAbortedTxnProcessor.clearAbortedTxnSnapshot().thenCompose(__ ->
closeAsync())
+ .thenAccept(__ -> {
+ changeToClosedAndClearedState();
+ });
+ }
+
@Override
public CompletableFuture<Void> closeAsync() {
synchronized (pendingAppendingTxnBufferTasks) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
index 9a8f2041bf4..abfb58eb658 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
@@ -33,6 +33,7 @@ public abstract class TopicTransactionBufferState {
Initializing,
Ready,
Close,
+ ClosedAndCleared,
NoSnapshot,
FirstSnapshotting
}
@@ -71,6 +72,10 @@ public abstract class TopicTransactionBufferState {
STATE_UPDATER.set(this, State.Close);
}
+ protected void changeToClosedAndClearedState() {
+ STATE_UPDATER.compareAndSet(this, State.Close, State.ClosedAndCleared);
+ }
+
public boolean checkIfInitializing() {
return STATE_UPDATER.get(this) == State.Initializing;
}
@@ -88,7 +93,11 @@ public abstract class TopicTransactionBufferState {
}
public boolean checkIfClosed() {
- return STATE_UPDATER.get(this) == State.Close;
+ return STATE_UPDATER.get(this) == State.Close ||
STATE_UPDATER.get(this) == State.ClosedAndCleared;
+ }
+
+ public boolean checkIfClosedAndCleared() {
+ return STATE_UPDATER.get(this) == State.ClosedAndCleared;
}
public State getState() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index f6e2ad04e50..7d59ab5dd14 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -87,6 +87,11 @@ public class TransactionBufferDisable implements
TransactionBuffer {
return CompletableFuture.completedFuture(null);
}
+ @Override
+ public CompletableFuture<Void> clearSnapshotAndClose() {
+ return clearSnapshot().thenCompose(__ -> closeAsync());
+ }
+
@Override
public CompletableFuture<Void> closeAsync() {
return CompletableFuture.completedFuture(null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index e567d1d5281..b3c4b804286 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -80,11 +80,13 @@ import
org.apache.pulsar.common.policies.data.TransactionMetadata;
import
org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
+import org.apache.pulsar.common.util.FutureUtil;
import
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+import org.apache.zookeeper.KeeperException;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -1110,4 +1112,32 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
assertEquals(persistentTopicStats.ledgers.size(),
internalStats.ledgers.size());
assertEquals(persistentTopicStats.cursors.size(),
internalStats.cursors.size());
}
+
+ @Test
+ public void testRetryDeleteTopicAfterFailedDeleteLedger() throws Exception
{
+ // Create a topic.
+ final String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(tpName);
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(tpName).create();
+ producer.send("1");
+ producer.close();
+ // The first deleting should fail, since we injected an error.
+
pulsarTestContext.getMockZooKeeper().failConditional(KeeperException.Code.BADVERSION,
(op, path) -> {
+ if ("DELETE".equals(op.toString())
+ &&
path.endsWith(TopicName.get(tpName).getPersistenceNamingEncoding())) {
+ return true;
+ }
+ return false;
+ });
+ try {
+ admin.topics().delete(tpName);
+ fail("The deleting should fail because we injected an error");
+ } catch (Throwable ex) {
+ // expected
+ Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+ assertTrue(actEx.getMessage().contains("BadVersionException"));
+ }
+ // The second deleting should succeed.
+ admin.topics().delete(tpName);
+ }
}
diff --git
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 9a195159377..360d82e0791 100644
---
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -354,7 +354,9 @@ public class PulsarMockBookKeeper extends BookKeeper {
failures.add(delayFuture);
}
-
+ /**
+ * @param rc see also {@link
org.apache.bookkeeper.client.BKException.Code}.
+ */
public void failNow(int rc) {
failAfter(0, rc);
}