This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a50fe87 [Transaction] Fix delete sub then delete pending ack. (#11023)
a50fe87 is described below
commit a50fe878773a76ca974d1af6ea8b994a5df7f81a
Author: congbo <[email protected]>
AuthorDate: Mon Aug 9 09:49:13 2021 +0800
[Transaction] Fix delete sub then delete pending ack. (#11023)
Fix delete sub then delete pending ack managedledger.
---
.../broker/service/persistent/PersistentTopic.java | 96 +++++++++++++++-------
.../pulsar/broker/service/PersistentTopicTest.java | 6 ++
.../pulsar/broker/service/ServerCnxTest.java | 4 +-
.../buffer/TransactionBufferClientTest.java | 5 +-
.../pendingack/PendingAckPersistentTest.java | 74 ++++++++++++++++-
5 files changed, 153 insertions(+), 32 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c03457c..1f11e14 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -64,6 +64,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -107,6 +108,7 @@ import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
@@ -941,7 +943,32 @@ public class PersistentTopic extends AbstractTopic
@Override
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+
getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
+ .getTransactionPendingAckStoreSuffix(topic,
+
Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
+ new AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
+ asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+ }
+
+ @Override
+ public void deleteLedgerFailed(ManagedLedgerException exception,
Object ctx) {
+ if (exception instanceof MetadataNotFoundException) {
+ asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+ return;
+ }
+ unsubscribeFuture.completeExceptionally(exception);
+ log.error("[{}][{}] Error deleting subscription pending ack
store",
+ topic, subscriptionName, exception);
+ }
+ }, null);
+
+ return unsubscribeFuture;
+ }
+
+ private void asyncDeleteCursor(String subscriptionName,
CompletableFuture<Void> unsubscribeFuture) {
ledger.asyncDeleteCursor(Codec.encode(subscriptionName), new
DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
@@ -956,13 +983,12 @@ public class PersistentTopic extends AbstractTopic
@Override
public void deleteCursorFailed(ManagedLedgerException exception,
Object ctx) {
if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Error deleting cursor for
subscription", topic, subscriptionName, exception);
+ log.debug("[{}][{}] Error deleting cursor for
subscription",
+ topic, subscriptionName, exception);
}
unsubscribeFuture.completeExceptionally(new
PersistenceException(exception));
}
}, null);
-
- return unsubscribeFuture;
}
void removeSubscription(String subscriptionName) {
@@ -1069,32 +1095,46 @@ public class PersistentTopic extends AbstractTopic
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
- ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
- @Override
- public void deleteLedgerComplete(Object ctx) {
- brokerService.removeTopicFromCache(topic);
-
-
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
-
brokerService.pulsar().getTopicPoliciesService().clean(TopicName.get(topic));
- log.info("[{}] Topic deleted", topic);
- deleteFuture.complete(null);
+ List<CompletableFuture<Void>> subsDeleteFutures =
new ArrayList<>();
+ subscriptions.forEach((sub, p) ->
subsDeleteFutures.add(unsubscribe(sub)));
+
+
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+ if (e != null) {
+ log.error("[{}] Error deleting topic",
topic, e);
+ unfenceTopicToResume();
+ deleteFuture.completeExceptionally(e);
+ } else {
+ ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
+ @Override
+ public void
deleteLedgerComplete(Object ctx) {
+
brokerService.removeTopicFromCache(topic);
+
+
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+
brokerService.pulsar().getTopicPoliciesService()
+
.clean(TopicName.get(topic));
+ log.info("[{}] Topic deleted",
topic);
+ deleteFuture.complete(null);
+ }
+
+ @Override
+ public void
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ if (exception.getCause()
+ instanceof
MetadataStoreException.NotFoundException) {
+ log.info("[{}] Topic is
already deleted {}",
+ topic,
exception.getMessage());
+ deleteLedgerComplete(ctx);
+ } else {
+ unfenceTopicToResume();
+ log.error("[{}] Error deleting
topic", topic, exception);
+
deleteFuture.completeExceptionally(new PersistenceException(exception));
+ }
+ }
+ }, null);
}
-
- @Override
- public void
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
- if (exception.getCause() instanceof
MetadataStoreException.NotFoundException) {
- log.info("[{}] Topic is already
deleted {}", topic, exception.getMessage());
- deleteLedgerComplete(ctx);
- } else {
- unfenceTopicToResume();
- log.error("[{}] Error deleting topic",
topic, exception);
- deleteFuture.completeExceptionally(new
PersistenceException(exception));
- }
- }
- }, null);
+ });
}
});
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 5c074fc..2605a58 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -183,6 +183,12 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient();
+ doAnswer(invocation -> {
+ DeleteLedgerCallback deleteLedgerCallback =
invocation.getArgument(1);
+ deleteLedgerCallback.deleteLedgerComplete(null);
+ return null;
+ }).when(mlFactoryMock).asyncDelete(any(), any(), any());
+
ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
doReturn(createMockBookKeeper(executor))
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index f16f7bb..d670f50 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -772,7 +772,7 @@ public class ServerCnxTest {
// Create producer second time
clientCommand = Commands.newSubscribe(successTopicName, //
- successSubName, 1 /* consumer id */, 1 /* request id */,
SubType.Exclusive, 0,
+ successSubName, 2 /* consumer id */, 1 /* request id */,
SubType.Exclusive, 0,
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);
@@ -780,7 +780,7 @@ public class ServerCnxTest {
Object response = getResponse();
assertTrue(response instanceof CommandError, "Response is not
CommandError but " + response);
CommandError error = (CommandError) response;
- assertEquals(error.getError(), ServerError.ServiceNotReady);
+ assertEquals(error.getError(), ServerError.ConsumerBusy);
});
channel.finish();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 2ad16a7..ba73034 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.broker.service.Topic;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import
org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
@@ -90,9 +91,11 @@ public class TransactionBufferClientTest extends
TransactionMetaStoreTestBase {
pulsarAdmins[0].tenants().createTenant("public", new
TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("my-cluster")));
pulsarAdmins[0].namespaces().createNamespace(namespace, 10);
pulsarAdmins[0].topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(),
partitions);
+ String subName = "test";
+
pulsarAdmins[0].topics().createSubscription(partitionedTopicName.getPartitionedTopicName(),
subName, MessageId.latest);
pulsarClient.newConsumer()
.topic(partitionedTopicName.getPartitionedTopicName())
- .subscriptionName("test").subscribe();
+ .subscriptionName(subName).subscribe();
tbClient = TransactionBufferClientImpl.create(
((PulsarClientImpl) pulsarClient),
new HashedWheelTimer(new
DefaultThreadFactory("transaction-buffer")));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 73f0ce7..3820ebc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.transaction.pendingack;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
@@ -45,9 +46,11 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -60,7 +63,11 @@ import org.testng.annotations.Test;
public class PendingAckPersistentTest extends TransactionTestBase {
private static final String PENDING_ACK_REPLAY_TOPIC =
"persistent://public/txn/pending-ack-replay";
+
+ private static final String NAMESPACE = "public/txn";
+
private static final int NUM_PARTITIONS = 16;
+
@BeforeMethod
public void setup() throws Exception {
setBrokerCount(1);
@@ -75,7 +82,7 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
16);
admin.tenants().createTenant("public",
new TenantInfoImpl(Sets.newHashSet(),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace("public/txn", 10);
+ admin.namespaces().createNamespace(NAMESPACE, 10);
admin.topics().createNonPartitionedTopic(PENDING_ACK_REPLAY_TOPIC);
pulsarClient = PulsarClient.builder()
@@ -298,4 +305,69 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
.until(() -> ((PositionImpl)
managedCursor.getMarkDeletedPosition())
.compareTo((PositionImpl)
managedCursor.getManagedLedger().getLastConfirmedEntry()) == -1);
}
+
+ @Test
+ private void testDeleteSubThenDeletePendingAckManagedLedger() throws
Exception {
+
+ String subName = "test-delete";
+
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
+ NamespaceName.get(NAMESPACE), "test-delete").toString();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Failover)
+ .enableBatchIndexAcknowledgment(true)
+ .subscribe();
+
+ consumer.close();
+
+ admin.topics().deleteSubscription(topic, subName);
+
+ List<String> topics = admin.namespaces().getTopics(NAMESPACE);
+
+ TopicStats topicStats = admin.topics().getStats(topic, false);
+
+
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
subName)));
+
+ assertTrue(topics.contains(topic));
+ }
+
+ @Test
+ private void testDeleteTopicThenDeletePendingAckManagedLedger() throws
Exception {
+
+ String subName1 = "test-delete";
+ String subName2 = "test-delete";
+
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
+ NamespaceName.get(NAMESPACE), "test-delete").toString();
+ @Cleanup
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName1)
+ .subscriptionType(SubscriptionType.Failover)
+ .enableBatchIndexAcknowledgment(true)
+ .subscribe();
+
+ consumer1.close();
+
+ @Cleanup
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName2)
+ .subscriptionType(SubscriptionType.Failover)
+ .enableBatchIndexAcknowledgment(true)
+ .subscribe();
+
+ consumer2.close();
+
+ admin.topics().delete(topic);
+
+ List<String> topics = admin.namespaces().getTopics(NAMESPACE);
+
+
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
subName1)));
+
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
subName2)));
+ assertFalse(topics.contains(topic));
+ }
}