This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 79bbfce7bd7b8b9b33884b3a643dc4e2a2398080
Author: congbo <[email protected]>
AuthorDate: Mon Aug 9 09:49:13 2021 +0800

    [Transaction] Fix delete sub then delete pending ack. (#11023)
    
    Fix delete sub then delete pending ack managedledger.
    
    (cherry picked from commit a50fe878773a76ca974d1af6ea8b994a5df7f81a)
---
 .../broker/service/persistent/PersistentTopic.java | 97 +++++++++++++++-------
 .../pulsar/broker/service/PersistentTopicTest.java |  6 ++
 .../pulsar/broker/service/ServerCnxTest.java       | 13 +--
 .../buffer/TransactionBufferClientTest.java        |  5 +-
 .../pendingack/PendingAckPersistentTest.java       | 76 ++++++++++++++++-
 5 files changed, 161 insertions(+), 36 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 62a0701..2f9a91d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -64,6 +64,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
+import 
org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -107,6 +108,7 @@ import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
@@ -147,6 +149,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
 import org.apache.zookeeper.KeeperException;
@@ -949,7 +952,32 @@ public class PersistentTopic extends AbstractTopic
     @Override
     public CompletableFuture<Void> unsubscribe(String subscriptionName) {
         CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+        
getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
+                .getTransactionPendingAckStoreSuffix(topic,
+                        
Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
+                new AsyncCallbacks.DeleteLedgerCallback() {
+            @Override
+            public void deleteLedgerComplete(Object ctx) {
+                asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+            }
+
+            @Override
+            public void deleteLedgerFailed(ManagedLedgerException exception, 
Object ctx) {
+                if (exception instanceof MetadataNotFoundException) {
+                    asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                    return;
+                }
+
+                unsubscribeFuture.completeExceptionally(exception);
+                log.error("[{}][{}] Error deleting subscription pending ack 
store",
+                        topic, subscriptionName, exception);
+            }
+        }, null);
+
+        return unsubscribeFuture;
+    }
 
+    private void asyncDeleteCursor(String subscriptionName, 
CompletableFuture<Void> unsubscribeFuture) {
         ledger.asyncDeleteCursor(Codec.encode(subscriptionName), new 
DeleteCursorCallback() {
             @Override
             public void deleteCursorComplete(Object ctx) {
@@ -964,13 +992,12 @@ public class PersistentTopic extends AbstractTopic
             @Override
             public void deleteCursorFailed(ManagedLedgerException exception, 
Object ctx) {
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}][{}] Error deleting cursor for 
subscription", topic, subscriptionName, exception);
+                    log.debug("[{}][{}] Error deleting cursor for 
subscription",
+                            topic, subscriptionName, exception);
                 }
                 unsubscribeFuture.completeExceptionally(new 
PersistenceException(exception));
             }
         }, null);
-
-        return unsubscribeFuture;
     }
 
     void removeSubscription(String subscriptionName) {
@@ -1077,32 +1104,46 @@ public class PersistentTopic extends AbstractTopic
                             unfenceTopicToResume();
                             deleteFuture.completeExceptionally(ex);
                         } else {
-                            ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
-                                @Override
-                                public void deleteLedgerComplete(Object ctx) {
-                                    brokerService.removeTopicFromCache(topic);
-
-                                    
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
-
-                                    
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-
-                                    
brokerService.pulsar().getTopicPoliciesService().clean(TopicName.get(topic));
-                                    log.info("[{}] Topic deleted", topic);
-                                    deleteFuture.complete(null);
-                                }
-
-                                @Override
-                                public void 
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                                    if (exception.getCause() instanceof 
KeeperException.NoNodeException) {
-                                        log.info("[{}] Topic is already 
deleted {}", topic, exception.getMessage());
-                                        deleteLedgerComplete(ctx);
-                                    } else {
-                                        unfenceTopicToResume();
-                                        log.error("[{}] Error deleting topic", 
topic, exception);
-                                        deleteFuture.completeExceptionally(new 
PersistenceException(exception));
-                                    }
+                            List<CompletableFuture<Void>> subsDeleteFutures = 
new ArrayList<>();
+                            subscriptions.forEach((sub, p) -> 
subsDeleteFutures.add(unsubscribe(sub)));
+
+                            
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
+                                if (e != null) {
+                                    log.error("[{}] Error deleting topic", 
topic, e);
+                                    unfenceTopicToResume();
+                                    deleteFuture.completeExceptionally(e);
+                                } else {
+                                    ledger.asyncDelete(new 
AsyncCallbacks.DeleteLedgerCallback() {
+                                        @Override
+                                        public void 
deleteLedgerComplete(Object ctx) {
+                                            
brokerService.removeTopicFromCache(topic);
+
+                                            
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
+
+                                            
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
+
+                                            
brokerService.pulsar().getTopicPoliciesService()
+                                                    
.clean(TopicName.get(topic));
+                                            log.info("[{}] Topic deleted", 
topic);
+                                            deleteFuture.complete(null);
+                                        }
+
+                                        @Override
+                                        public void 
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                                            if (exception.getCause()
+                                                    instanceof 
MetadataStoreException.NotFoundException) {
+                                                log.info("[{}] Topic is 
already deleted {}",
+                                                        topic, 
exception.getMessage());
+                                                deleteLedgerComplete(ctx);
+                                            } else {
+                                                unfenceTopicToResume();
+                                                log.error("[{}] Error deleting 
topic", topic, exception);
+                                                
deleteFuture.completeExceptionally(new PersistenceException(exception));
+                                            }
+                                        }
+                                    }, null);
                                 }
-                            }, null);
+                            });
                         }
                     });
                 } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 4a804b2..b53404a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -178,6 +178,12 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         mlFactoryMock = mock(ManagedLedgerFactory.class);
         doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
 
+        doAnswer(invocation -> {
+            DeleteLedgerCallback deleteLedgerCallback = 
invocation.getArgument(1);
+            deleteLedgerCallback.deleteLedgerComplete(null);
+            return null;
+        }).when(mlFactoryMock).asyncDelete(any(), any(), any());
+
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
         doReturn(createMockBookKeeper(executor))
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b2d1e54..2e2f0f2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -115,6 +115,7 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -759,14 +760,16 @@ public class ServerCnxTest {
 
         // Create producer second time
         clientCommand = Commands.newSubscribe(successTopicName, //
-                successSubName, 1 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0,
+                successSubName, 2 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0,
                 "test" /* consumer name */, 0 /* avoid reseting cursor */);
         channel.writeInbound(clientCommand);
 
-        Object response = getResponse();
-        assertTrue(response instanceof CommandError, "Response is not 
CommandError but " + response);
-        CommandError error = (CommandError) response;
-        assertEquals(error.getError(), ServerError.ServiceNotReady);
+        Awaitility.await().untilAsserted(() -> {
+            Object response = getResponse();
+            assertTrue(response instanceof CommandError, "Response is not 
CommandError but " + response);
+            CommandError error = (CommandError) response;
+            assertEquals(error.getError(), ServerError.ConsumerBusy);
+        });
         channel.finish();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 5d088bb..6081580 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
 import 
org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import 
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
@@ -90,9 +91,11 @@ public class TransactionBufferClientTest extends 
TransactionMetaStoreTestBase {
         pulsarAdmins[0].tenants().createTenant("public", new 
TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("my-cluster")));
         pulsarAdmins[0].namespaces().createNamespace(namespace, 10);
         
pulsarAdmins[0].topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(),
 partitions);
+        String subName = "test";
+        
pulsarAdmins[0].topics().createSubscription(partitionedTopicName.getPartitionedTopicName(),
 subName, MessageId.latest);
         pulsarClient.newConsumer()
                 .topic(partitionedTopicName.getPartitionedTopicName())
-                .subscriptionName("test").subscribe();
+                .subscriptionName(subName).subscribe();
         tbClient = TransactionBufferClientImpl.create(
                 ((PulsarClientImpl) pulsarClient),
                 new HashedWheelTimer(new 
DefaultThreadFactory("transaction-buffer")));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 191e4027..4619911 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.transaction.pendingack;
 
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
@@ -45,9 +46,11 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -59,7 +62,11 @@ import org.testng.annotations.Test;
 @Slf4j
 public class PendingAckPersistentTest extends TransactionTestBase {
 
-    private final static String PENDING_ACK_REPLAY_TOPIC = 
"persistent://public/txn/pending-ack-replay";
+    private static final String PENDING_ACK_REPLAY_TOPIC = 
"persistent://public/txn/pending-ack-replay";
+
+    private static final String NAMESPACE = "public/txn";
+
+    private static final int NUM_PARTITIONS = 16;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -75,7 +82,7 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
         
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 16);
         admin.tenants().createTenant("public",
                 new TenantInfoImpl(Sets.newHashSet(), 
Sets.newHashSet(CLUSTER_NAME)));
-        admin.namespaces().createNamespace("public/txn", 10);
+        admin.namespaces().createNamespace(NAMESPACE, 10);
         admin.topics().createNonPartitionedTopic(PENDING_ACK_REPLAY_TOPIC);
 
         pulsarClient = PulsarClient.builder()
@@ -298,4 +305,69 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
                 .until(() -> ((PositionImpl) 
managedCursor.getMarkDeletedPosition())
                         .compareTo((PositionImpl) 
managedCursor.getManagedLedger().getLastConfirmedEntry()) == -1);
     }
+
+    @Test
+    private void testDeleteSubThenDeletePendingAckManagedLedger() throws 
Exception {
+
+        String subName = "test-delete";
+
+        String topic = TopicName.get(TopicDomain.persistent.toString(),
+                NamespaceName.get(NAMESPACE), "test-delete").toString();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Failover)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        consumer.close();
+
+        admin.topics().deleteSubscription(topic, subName);
+
+        List<String> topics = admin.namespaces().getTopics(NAMESPACE);
+
+        TopicStats topicStats = admin.topics().getStats(topic, false);
+
+        
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
 subName)));
+
+        assertTrue(topics.contains(topic));
+    }
+
+    @Test
+    private void testDeleteTopicThenDeletePendingAckManagedLedger() throws 
Exception {
+
+        String subName1 = "test-delete";
+        String subName2 = "test-delete";
+
+        String topic = TopicName.get(TopicDomain.persistent.toString(),
+                NamespaceName.get(NAMESPACE), "test-delete").toString();
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName1)
+                .subscriptionType(SubscriptionType.Failover)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        consumer1.close();
+
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName2)
+                .subscriptionType(SubscriptionType.Failover)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        consumer2.close();
+
+        admin.topics().delete(topic);
+
+        List<String> topics = admin.namespaces().getTopics(NAMESPACE);
+
+        
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
 subName1)));
+        
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
 subName2)));
+        assertFalse(topics.contains(topic));
+    }
 }

Reply via email to