This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d87a2304fc55828aa1a7a905c2631f594b49b764
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Mar 1 10:41:07 2022 +0800

    [Broker] Fix ``Future.join()`` causing deadlock. (#14469)
    
    Master issue #14438
    
    ### Motivation
    
    Invoking the ``join()`` method in the async method will cause some deadlock.
    
    ### Modifications
    
    - Refactor ``PersistentTopic#tryToDeletePartitionedMetadata`` to pure async.
    
    (cherry picked from commit 65318e83f8d5b4207a9398e100390800425d5433)
---
 .../broker/service/persistent/PersistentTopic.java | 78 ++++++++++++----------
 .../broker/service/PersistentTopicE2ETest.java     | 22 +++++-
 2 files changed, 63 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 00934ad..e96fdc4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2278,42 +2278,48 @@ public class PersistentTopic extends AbstractTopic
             return CompletableFuture.completedFuture(null);
         }
         TopicName topicName = 
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
-        try {
-            PartitionedTopicResources partitionedTopicResources = 
getBrokerService().pulsar().getPulsarResources()
-                    .getNamespaceResources()
-                    .getPartitionedTopicResources();
-            if (topicName.isPartitioned() && 
!partitionedTopicResources.partitionedTopicExists(topicName)) {
-                return CompletableFuture.completedFuture(null);
-            }
-            CompletableFuture<Void> deleteMetadataFuture = new 
CompletableFuture<>();
-            
getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName()))
-                    .thenAccept((metadata -> {
-                        // make sure all sub partitions were deleted
-                        for (int i = 0; i < metadata.partitions; i++) {
-                            if 
(brokerService.getPulsar().getPulsarResources().getTopicResources()
-                                    
.persistentTopicExists(topicName.getPartition(i)).join()) {
-                                throw new UnsupportedOperationException();
-                            }
-                        }
-                    }))
-                    .thenAccept((res) -> 
partitionedTopicResources.deletePartitionedTopicAsync(topicName)
-                            .thenAccept((r) -> {
-                        deleteMetadataFuture.complete(null);
-                    }).exceptionally(ex -> {
-                        
deleteMetadataFuture.completeExceptionally(ex.getCause());
-                        return null;
-                    }))
-                    .exceptionally((e) -> {
-                        if (!(e.getCause() instanceof 
UnsupportedOperationException)) {
-                            log.error("delete metadata fail", e);
-                        }
-                        deleteMetadataFuture.complete(null);
-                        return null;
-                    });
-            return deleteMetadataFuture;
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
+        PartitionedTopicResources partitionedTopicResources = 
getBrokerService().pulsar().getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources();
+        return partitionedTopicResources.partitionedTopicExistsAsync(topicName)
+                .thenCompose(partitionedTopicExist -> {
+                    if (!partitionedTopicExist) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return getBrokerService()
+                                .fetchPartitionedTopicMetadataAsync(topicName)
+                                .thenCompose((metadata -> {
+                                    List<CompletableFuture<Boolean>> 
persistentTopicExists =
+                                            new 
ArrayList<>(metadata.partitions);
+                                    for (int i = 0; i < metadata.partitions; 
i++) {
+                                        
persistentTopicExists.add(brokerService.getPulsar()
+                                                
.getPulsarResources().getTopicResources()
+                                                
.persistentTopicExists(topicName.getPartition(i)));
+                                    }
+                                    List<CompletableFuture<Boolean>> 
unmodifiablePersistentTopicExists =
+                                            
Collections.unmodifiableList(persistentTopicExists);
+                                    return 
FutureUtil.waitForAll(unmodifiablePersistentTopicExists)
+                                            .thenCompose(unused -> {
+                                                // make sure all sub 
partitions were deleted after all future complete
+                                                Optional<Boolean> 
anyExistPartition = unmodifiablePersistentTopicExists
+                                                        .stream()
+                                                        
.map(CompletableFuture::join)
+                                                        .filter(topicExist -> 
topicExist)
+                                                        .findAny();
+                                                if 
(anyExistPartition.isPresent()) {
+                                                    log.error("[{}] Delete 
topic metadata failed because"
+                                                            + " another 
partition exist.", topicName);
+                                                    throw new 
UnsupportedOperationException(
+                                                            
String.format("Another partition exists for [%s].",
+                                                                    
topicName));
+                                                } else {
+                                                    return 
partitionedTopicResources
+                                                            
.deletePartitionedTopicAsync(topicName);
+                                                }
+                                            });
+                                }));
+                    }
+                });
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 42512b0..daf0ed7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -96,6 +96,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
         super.baseSetup();
     }
 
@@ -617,8 +618,27 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
 
         runGC();
         
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
-    }
 
+        // write again, the topic will be available
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName).create();
+        producer2.close();
+
+        
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 6. Test for partitioned topic to delete the partitioned metadata
+        String topicGc = "persistent://prop/ns-abc/topic-gc";
+        int partitions = 5;
+        admin.topics().createPartitionedTopic(topicGc, partitions);
+        Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topicGc).create();
+        producer3.close();
+        assertEquals(partitions, 
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                TopicName.get(topicGc)).join().partitions);
+        runGC();
+        Awaitility.await().untilAsserted(()-> {
+            
assertEquals(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                    TopicName.get(topicGc)).join().partitions, 0);
+        });
+    }
     @Data
     @ToString
     @EqualsAndHashCode

Reply via email to