This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d87a2304fc55828aa1a7a905c2631f594b49b764 Author: Qiang Zhao <[email protected]> AuthorDate: Tue Mar 1 10:41:07 2022 +0800 [Broker] Fix ``Future.join()`` causing deadlock. (#14469) Master issue #14438 ### Motivation Invoking the ``join()`` method in the async method will cause some deadlock. ### Modifications - Refactor ``PersistentTopic#tryToDeletePartitionedMetadata`` to pure async. (cherry picked from commit 65318e83f8d5b4207a9398e100390800425d5433) --- .../broker/service/persistent/PersistentTopic.java | 78 ++++++++++++---------- .../broker/service/PersistentTopicE2ETest.java | 22 +++++- 2 files changed, 63 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 00934ad..e96fdc4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2278,42 +2278,48 @@ public class PersistentTopic extends AbstractTopic return CompletableFuture.completedFuture(null); } TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName()); - try { - PartitionedTopicResources partitionedTopicResources = getBrokerService().pulsar().getPulsarResources() - .getNamespaceResources() - .getPartitionedTopicResources(); - if (topicName.isPartitioned() && !partitionedTopicResources.partitionedTopicExists(topicName)) { - return CompletableFuture.completedFuture(null); - } - CompletableFuture<Void> deleteMetadataFuture = new CompletableFuture<>(); - getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName())) - .thenAccept((metadata -> { - // make sure all sub partitions were deleted - for (int i = 0; i < metadata.partitions; i++) { - if (brokerService.getPulsar().getPulsarResources().getTopicResources() - .persistentTopicExists(topicName.getPartition(i)).join()) { - throw new UnsupportedOperationException(); - } - } - })) - .thenAccept((res) -> partitionedTopicResources.deletePartitionedTopicAsync(topicName) - .thenAccept((r) -> { - deleteMetadataFuture.complete(null); - }).exceptionally(ex -> { - deleteMetadataFuture.completeExceptionally(ex.getCause()); - return null; - })) - .exceptionally((e) -> { - if (!(e.getCause() instanceof UnsupportedOperationException)) { - log.error("delete metadata fail", e); - } - deleteMetadataFuture.complete(null); - return null; - }); - return deleteMetadataFuture; - } catch (Exception e) { - return FutureUtil.failedFuture(e); - } + PartitionedTopicResources partitionedTopicResources = getBrokerService().pulsar().getPulsarResources() + .getNamespaceResources() + .getPartitionedTopicResources(); + return partitionedTopicResources.partitionedTopicExistsAsync(topicName) + .thenCompose(partitionedTopicExist -> { + if (!partitionedTopicExist) { + return CompletableFuture.completedFuture(null); + } else { + return getBrokerService() + .fetchPartitionedTopicMetadataAsync(topicName) + .thenCompose((metadata -> { + List<CompletableFuture<Boolean>> persistentTopicExists = + new ArrayList<>(metadata.partitions); + for (int i = 0; i < metadata.partitions; i++) { + persistentTopicExists.add(brokerService.getPulsar() + .getPulsarResources().getTopicResources() + .persistentTopicExists(topicName.getPartition(i))); + } + List<CompletableFuture<Boolean>> unmodifiablePersistentTopicExists = + Collections.unmodifiableList(persistentTopicExists); + return FutureUtil.waitForAll(unmodifiablePersistentTopicExists) + .thenCompose(unused -> { + // make sure all sub partitions were deleted after all future complete + Optional<Boolean> anyExistPartition = unmodifiablePersistentTopicExists + .stream() + .map(CompletableFuture::join) + .filter(topicExist -> topicExist) + .findAny(); + if (anyExistPartition.isPresent()) { + log.error("[{}] Delete topic metadata failed because" + + " another partition exist.", topicName); + throw new UnsupportedOperationException( + String.format("Another partition exists for [%s].", + topicName)); + } else { + return partitionedTopicResources + .deletePartitionedTopicAsync(topicName); + } + }); + })); + } + }); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 42512b0..daf0ed7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -96,6 +96,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase { @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { + conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true); super.baseSetup(); } @@ -617,8 +618,27 @@ public class PersistentTopicE2ETest extends BrokerTestBase { runGC(); assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); - } + // write again, the topic will be available + Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName).create(); + producer2.close(); + + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // 6. Test for partitioned topic to delete the partitioned metadata + String topicGc = "persistent://prop/ns-abc/topic-gc"; + int partitions = 5; + admin.topics().createPartitionedTopic(topicGc, partitions); + Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicGc).create(); + producer3.close(); + assertEquals(partitions, pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync( + TopicName.get(topicGc)).join().partitions); + runGC(); + Awaitility.await().untilAsserted(()-> { + assertEquals(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync( + TopicName.get(topicGc)).join().partitions, 0); + }); + } @Data @ToString @EqualsAndHashCode
