This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 93afd89b047 [fix][broker] One topic can be closed multiple times 
concurrently (#17524)
93afd89b047 is described below

commit 93afd89b047ac56d3b7e476f578993197cf41935
Author: fengyubiao <[email protected]>
AuthorDate: Mon Apr 29 13:40:18 2024 +0800

    [fix][broker] One topic can be closed multiple times concurrently (#17524)
---
 .../broker/service/persistent/PersistentTopic.java | 199 +++++++++++++++++----
 .../broker/service/OneWayReplicatorTest.java       |  21 ++-
 .../service/persistent/PersistentTopicTest.java    |  81 ++++++++-
 .../org/apache/pulsar/common/util/FutureUtil.java  |  33 +++-
 4 files changed, 290 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 95a2b64908a..22041326ba2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -50,6 +50,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
@@ -276,6 +277,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Getter
     private final ExecutorService orderedExecutor;
 
+    private volatile CloseFutures closeFutures;
+
     @Getter
     private final PersistentTopicMetrics persistentTopicMetrics = new 
PersistentTopicMetrics();
 
@@ -299,6 +302,50 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         Long estimatedOldestUnacknowledgedMessageTimestamp;
     }
 
+    /***
+     * We use 3 futures to prevent a new closing if there is an in-progress 
deletion or closing.  We make Pulsar return
+     * the in-progress one when it is called the second time.
+     *
+     * The topic closing will be called the below scenarios:
+     * 1. Calling "pulsar-admin topics unload". Relate to {@link 
CloseFutures#waitDisconnectClients}.
+     * 2. Namespace bundle transfer or unloading.
+     *   a. The unloading topic triggered by unloading namespace bundles will 
not wait for clients disconnect. Relate
+     *     to {@link CloseFutures#notWaitDisconnectClients}.
+     *   b. The unloading topic triggered by unloading namespace bundles was 
seperated to two steps when using
+     *     {@link ExtensibleLoadManagerImpl}.
+     *     b-1. step-1: fence the topic on the original Broker, and do not 
trigger reconnections of clients. Relate
+     *       to {@link CloseFutures#transferring}. This step is a half closing.
+     *     b-2. step-2: send the owner broker information to clients and 
disconnect clients. Relate
+     *       to {@link CloseFutures#notWaitDisconnectClients}.
+     *
+     * The three futures will be setting as the below rule:
+     * Event: Topic close.
+     * - If the first one closing is called by "close and not disconnect 
clients":
+     *   - {@link CloseFutures#transferring} will be initialized as "close and 
not disconnect clients".
+     *   - {@link CloseFutures#waitDisconnectClients} ang {@link 
CloseFutures#notWaitDisconnectClients} will be empty,
+     *     the second closing will do a new close after {@link 
CloseFutures#transferring} is completed.
+     * - If the first one closing is called by "close and not wait for clients 
disconnect":
+     *   - {@link CloseFutures#waitDisconnectClients} will be initialized as 
"waiting for clients disconnect".
+     *   - {@link CloseFutures#notWaitDisconnectClients} ang {@link 
CloseFutures#transferring} will be
+     *     initialized as "not waiting for clients disconnect" .
+     * - If the first one closing is called by "close and wait for clients 
disconnect", the three futures will be
+     *   initialized as "waiting for clients disconnect".
+     * Event: Topic delete.
+     *  the three futures will be initialized as "waiting for clients 
disconnect".
+     */
+    private class CloseFutures {
+        private final CompletableFuture<Void> transferring;
+        private final CompletableFuture<Void> notWaitDisconnectClients;
+        private final CompletableFuture<Void> waitDisconnectClients;
+
+        public CloseFutures(CompletableFuture<Void> transferring, 
CompletableFuture<Void> waitDisconnectClients,
+                            CompletableFuture<Void> notWaitDisconnectClients) {
+            this.transferring = transferring;
+            this.waitDisconnectClients = waitDisconnectClients;
+            this.notWaitDisconnectClients = notWaitDisconnectClients;
+        }
+    }
+
     private static class TopicStatsHelper {
         public double averageMsgSize;
         public double aggMsgRateIn;
@@ -1417,8 +1464,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             }
 
             fenceTopicToCloseOrDelete(); // Avoid clients reconnections while 
deleting
+            // Mark the progress of close to prevent close calling 
concurrently.
+            this.closeFutures =
+                    new CloseFutures(new CompletableFuture(), new 
CompletableFuture(), new CompletableFuture());
 
-            return 
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
+            CompletableFuture<Void> res = 
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
                         
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () 
-> {
                 CompletableFuture<Void> deleteFuture = new 
CompletableFuture<>();
 
@@ -1528,6 +1578,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                         unfenceTopicToResume();
                     }
                 });
+
+            FutureUtil.completeAfter(closeFutures.transferring, res);
+            FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, 
res);
+            FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res);
+            return res;
         } finally {
             lock.writeLock().unlock();
         }
@@ -1543,6 +1598,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return close(true, closeWithoutWaitingClientDisconnect);
     }
 
+    private enum CloseTypes {
+        transferring,
+        notWaitDisconnectClients,
+        waitDisconnectClients;
+    }
+
     /**
      * Close this topic - close all producers and subscriptions associated 
with this topic.
      *
@@ -1553,32 +1614,57 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Override
     public CompletableFuture<Void> close(
             boolean disconnectClients, boolean 
closeWithoutWaitingClientDisconnect) {
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
         lock.writeLock().lock();
-        try {
-            if (!disconnectClients) {
-                transferring = true;
-            }
+        // Choose the close type.
+        CloseTypes closeType;
+        if (!disconnectClients) {
+            closeType = CloseTypes.transferring;
+        } else if (closeWithoutWaitingClientDisconnect) {
+            closeType = CloseTypes.notWaitDisconnectClients;
+        } else {
             // closing managed-ledger waits until all 
producers/consumers/replicators get closed. Sometimes, broker
             // forcefully wants to close managed-ledger without waiting all 
resources to be closed.
-            if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
-                fenceTopicToCloseOrDelete();
+            closeType = CloseTypes.waitDisconnectClients;
+        }
+        /** Maybe there is a in-progress half closing task. see the section 
2-b-1 of {@link CloseFutures}. **/
+        CompletableFuture<Void> inProgressTransferCloseTask = null;
+        try {
+            // Return in-progress future if exists.
+            if (isClosingOrDeleting) {
+                if (closeType == CloseTypes.transferring) {
+                    return closeFutures.transferring;
+                }
+                if (closeType == CloseTypes.notWaitDisconnectClients && 
closeFutures.notWaitDisconnectClients != null) {
+                    return closeFutures.notWaitDisconnectClients;
+                }
+                if (closeType == CloseTypes.waitDisconnectClients && 
closeFutures.waitDisconnectClients != null) {
+                    return closeFutures.waitDisconnectClients;
+                }
+                if (transferring) {
+                    inProgressTransferCloseTask = closeFutures.transferring;
+                }
+            }
+            fenceTopicToCloseOrDelete();
+            if (closeType == CloseTypes.transferring) {
+                transferring = true;
+                this.closeFutures = new CloseFutures(new CompletableFuture(), 
null, null);
             } else {
-                log.warn("[{}] Topic is already being closed or deleted", 
topic);
-                closeFuture.completeExceptionally(new 
TopicFencedException("Topic is already fenced"));
-                return closeFuture;
+                this.closeFutures =
+                        new CloseFutures(new CompletableFuture(), new 
CompletableFuture(), new CompletableFuture());
             }
         } finally {
             lock.writeLock().unlock();
         }
 
         List<CompletableFuture<Void>> futures = new ArrayList<>();
+        if (inProgressTransferCloseTask != null) {
+            futures.add(inProgressTransferCloseTask);
+        }
 
         futures.add(transactionBuffer.closeAsync());
         replicators.forEach((cluster, replicator) -> 
futures.add(replicator.terminate()));
         shadowReplicators.forEach((__, replicator) -> 
futures.add(replicator.terminate()));
-        if (disconnectClients) {
+        if (closeType != CloseTypes.transferring) {
             futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
                 brokerService.getPulsar(), topic).thenAccept(lookupData -> {
                     producers.values().forEach(producer -> 
futures.add(producer.disconnect(lookupData)));
@@ -1616,40 +1702,79 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             }
         }
 
-        CompletableFuture<Void> clientCloseFuture = 
closeWithoutWaitingClientDisconnect
-                ? CompletableFuture.completedFuture(null)
-                : FutureUtil.waitForAll(futures);
+        CompletableFuture<Void> disconnectClientsInCurrentCall = null;
+        // Note: "disconnectClientsToCache" is a non-able value, it is null 
when close type is transferring.
+        AtomicReference<CompletableFuture<Void>> disconnectClientsToCache = 
new AtomicReference<>();
+        switch (closeType) {
+            case transferring -> {
+                disconnectClientsInCurrentCall = 
FutureUtil.waitForAll(futures);
+                break;
+            }
+            case notWaitDisconnectClients -> {
+                disconnectClientsInCurrentCall = 
CompletableFuture.completedFuture(null);
+                disconnectClientsToCache.set(FutureUtil.waitForAll(futures));
+                break;
+            }
+            case waitDisconnectClients -> {
+                disconnectClientsInCurrentCall = 
FutureUtil.waitForAll(futures);
+                disconnectClientsToCache.set(disconnectClientsInCurrentCall);
+            }
+        }
 
-        clientCloseFuture.thenRun(() -> {
-            // After having disconnected all producers/consumers, close the 
managed ledger
-            ledger.asyncClose(new CloseCallback() {
-                @Override
-                public void closeComplete(Object ctx) {
-                    if (disconnectClients) {
-                        // Everything is now closed, remove the topic from map
-                        disposeTopic(closeFuture);
-                    } else {
-                        closeFuture.complete(null);
-                    }
+        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        Runnable closeLedgerAfterCloseClients = (() -> ledger.asyncClose(new 
CloseCallback() {
+            @Override
+            public void closeComplete(Object ctx) {
+                if (closeType != CloseTypes.transferring) {
+                    // Everything is now closed, remove the topic from map
+                    disposeTopic(closeFuture);
+                } else {
+                    closeFuture.complete(null);
                 }
+            }
 
-                @Override
-                public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
-                    log.error("[{}] Failed to close managed ledger, proceeding 
anyway.", topic, exception);
-                    if (disconnectClients) {
-                        disposeTopic(closeFuture);
-                    } else {
-                        closeFuture.complete(null);
-                    }
+            @Override
+            public void closeFailed(ManagedLedgerException exception, Object 
ctx) {
+                log.error("[{}] Failed to close managed ledger, proceeding 
anyway.", topic, exception);
+                if (closeType != CloseTypes.transferring) {
+                    disposeTopic(closeFuture);
+                } else {
+                    closeFuture.complete(null);
                 }
-            }, null);
-        }).exceptionally(exception -> {
+            }
+        }, null));
+
+        
disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception
 -> {
             log.error("[{}] Error closing topic", topic, exception);
             unfenceTopicToResume();
             closeFuture.completeExceptionally(exception);
             return null;
         });
 
+        switch (closeType) {
+            case transferring -> {
+                FutureUtil.completeAfterAll(closeFutures.transferring, 
closeFuture);
+                break;
+            }
+            case notWaitDisconnectClients -> {
+                FutureUtil.completeAfterAll(closeFutures.transferring, 
closeFuture);
+                
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
+                FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients,
+                        closeFuture.thenCompose(ignore -> 
disconnectClientsToCache.get().exceptionally(ex -> {
+                            // Since the managed ledger has been closed, eat 
the error of clients disconnection.
+                            log.error("[{}] Closed managed ledger, but 
disconnect clients failed,"
+                                    + " this topic will be marked closed", 
topic, ex);
+                            return null;
+                        })));
+                break;
+            }
+            case waitDisconnectClients -> {
+                FutureUtil.completeAfterAll(closeFutures.transferring, 
closeFuture);
+                
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
+                
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture);
+            }
+        }
+
         return closeFuture;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 9b8b567af08..eb31c13b0d5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
@@ -226,7 +227,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
     }
 
-    private void injectMockReplicatorProducerBuilder(
+    private Runnable injectMockReplicatorProducerBuilder(
                                 BiFunction<ProducerConfigurationData, 
ProducerImpl, ProducerImpl> producerDecorator)
             throws Exception {
         String cluster2 = pulsar2.getConfig().getClusterName();
@@ -246,7 +247,8 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
                 replicationClients = 
WhiteboxImpl.getInternalState(brokerService, "replicationClients");
         PulsarClientImpl internalClient = (PulsarClientImpl) 
replicationClients.get(cluster2);
         PulsarClient spyClient = spy(internalClient);
-        replicationClients.put(cluster2, spyClient);
+        assertTrue(replicationClients.remove(cluster2, internalClient));
+        assertNull(replicationClients.putIfAbsent(cluster2, spyClient));
 
         // Inject producer decorator.
         doAnswer(invocation -> {
@@ -275,6 +277,12 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             }).when(spyProducerBuilder).createAsync();
             return spyProducerBuilder;
         }).when(spyClient).newProducer(any(Schema.class));
+
+        // Return a cleanup injection task;
+        return () -> {
+            assertTrue(replicationClients.remove(cluster2, spyClient));
+            assertNull(replicationClients.putIfAbsent(cluster2, 
internalClient));
+        };
     }
 
     private SpyCursor spyCursor(PersistentTopic persistentTopic, String 
cursorName) throws Exception {
@@ -368,7 +376,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         //   If the retry counter is larger than 6, the next creation will be 
slow enough to close Replicator.
         final AtomicInteger createProducerCounter = new AtomicInteger();
         final int failTimes = 6;
-        injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> 
{
+        Runnable taskToClearInjection = 
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
             if (topicName.equals(producerCnf.getTopicName())) {
                 // There is a switch to determine create producer successfully 
or not.
                 if (createProducerCounter.incrementAndGet() > failTimes) {
@@ -427,6 +435,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
 
         // cleanup.
+        taskToClearInjection.run();
         cleanupTopics(() -> {
             admin1.topics().delete(topicName);
             admin2.topics().delete(topicName);
@@ -531,7 +540,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         //   If the retry counter is larger than 6, the next creation will be 
slow enough to close Replicator.
         final AtomicInteger createProducerCounter = new AtomicInteger();
         final int failTimes = 6;
-        injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> 
{
+        Runnable taskToClearInjection = 
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
             if (topicName.equals(producerCnf.getTopicName())) {
                 // There is a switch to determine create producer successfully 
or not.
                 if (createProducerCounter.incrementAndGet() > failTimes) {
@@ -593,6 +602,7 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
 
         // cleanup.
+        taskToClearInjection.run();
         cleanupTopics(namespaceName, () -> {
             admin1.topics().delete(topicName);
             admin2.topics().delete(topicName);
@@ -644,8 +654,9 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             assertTrue(replicator2.producer != null && 
replicator2.producer.isConnected());
         });
 
-        // cleanup.
+        // cleanup the injection.
         persistentTopic.getProducers().remove(mockProducerName, mockProducer);
+        // cleanup.
         producer1.close();
         cleanupTopics(() -> {
             admin1.topics().delete(topicName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 44d24668cc3..d523586c2e2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -49,6 +49,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -56,6 +58,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -322,6 +325,83 @@ public class PersistentTopicTest extends BrokerTestBase {
         }
     }
 
+    @DataProvider(name = "closeWithoutWaitingClientDisconnectInFirstBatch")
+    public Object[][] closeWithoutWaitingClientDisconnectInFirstBatch() {
+        return new Object[][]{
+                new Object[] {true},
+                new Object[] {false},
+        };
+    }
+
+    @Test(dataProvider = "closeWithoutWaitingClientDisconnectInFirstBatch")
+    public void testConcurrentClose(boolean 
closeWithoutWaitingClientDisconnectInFirstBatch) throws Exception {
+        final String topicName = "persistent://prop/ns/concurrentClose";
+        final String ns = "prop/ns";
+        admin.namespaces().createNamespace(ns, 1);
+        admin.topics().createNonPartitionedTopic(topicName);
+        final Topic topic = 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+        List<CompletableFuture<Void>> futureList =
+                make2ConcurrentBatchesOfClose(topic, 10, 
closeWithoutWaitingClientDisconnectInFirstBatch);
+        Map<Integer, List<CompletableFuture<Void>>> futureMap =
+                
futureList.stream().collect(Collectors.groupingBy(Objects::hashCode));
+        /**
+         * The first call: get the return value of "topic.close".
+         * The other 19 calls: get the cached value which related {@link 
PersistentTopic#closeFutures}.
+         */
+        assertTrue(futureMap.size() <= 3);
+        for (List list : futureMap.values()){
+            if (list.size() == 1){
+                // This is the first call, the future is the return value of 
`topic.close`.
+            } else {
+                // Two types future list: wait client close or not.
+                assertTrue(list.size() >= 9 && list.size() <= 10);
+            }
+        }
+    }
+
+    private List<CompletableFuture<Void>> make2ConcurrentBatchesOfClose(Topic 
topic, int tryTimes,
+                                                              boolean 
closeWithoutWaitingClientDisconnectInFirstBatch){
+        final List<CompletableFuture<Void>> futureList = 
Collections.synchronizedList(new ArrayList<>());
+        final List<Thread> taskList = new ArrayList<>();
+        CountDownLatch allTaskBeginLatch = new CountDownLatch(1);
+        // Call a batch of close.
+        for (int i = 0; i < tryTimes; i++) {
+            Thread thread = new Thread(() -> {
+                try {
+                    allTaskBeginLatch.await(5, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                
futureList.add(topic.close(closeWithoutWaitingClientDisconnectInFirstBatch));
+            });
+            thread.start();
+            taskList.add(thread);
+        }
+        // Call another batch of close.
+        for (int i = 0; i < tryTimes; i++) {
+            Thread thread = new Thread(() -> {
+                try {
+                    allTaskBeginLatch.await(5, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                
futureList.add(topic.close(!closeWithoutWaitingClientDisconnectInFirstBatch));
+            });
+            thread.start();
+            taskList.add(thread);
+        }
+        // Wait close task executed.
+        allTaskBeginLatch.countDown();
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()->{
+            for (Thread thread : taskList){
+                if (thread.isAlive()){
+                    return false;
+                }
+            }
+            return true;
+        });
+        return futureList;
+    }
 
     @DataProvider(name = "topicAndMetricsLevel")
     public Object[][] indexPatternTestData() {
@@ -331,7 +411,6 @@ public class PersistentTopicTest extends BrokerTestBase {
         };
     }
 
-
     @Test(dataProvider = "topicAndMetricsLevel")
     public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, 
boolean exposeTopicLevelMetrics) throws Exception {
         PulsarClient client = pulsar.getClient();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 6f625898535..f6fcb12f359 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.common.util;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
@@ -69,6 +70,36 @@ public class FutureUtil {
                 })));
     }
 
+    /**
+     * Make the dest future complete after another one. {@param dest} is will 
be completed with the same value as
+     * {@param src}, or be completed with the same error as {@param src}.
+     */
+    public static <T> void completeAfter(final CompletableFuture<T> dest, 
CompletableFuture<T> src) {
+        src.whenComplete((v, ex) -> {
+            if (ex != null) {
+                dest.completeExceptionally(ex);
+            } else {
+                dest.complete(v);
+            }
+        });
+    }
+
+    /**
+     * Make the dest future complete after others. {@param dest} is will be 
completed with a {@link Void} value
+     * if all the futures of {@param src} is completed, or be completed 
exceptionally with the same error as the first
+     * one completed exceptionally future of {@param src}.
+     */
+    public static void completeAfterAll(final CompletableFuture<Void> dest,
+                                        CompletableFuture<? extends Object>... 
src) {
+        FutureUtil.waitForAll(Arrays.asList(src)).whenComplete((ignore, ex) -> 
{
+            if (ex != null) {
+                dest.completeExceptionally(ex);
+            } else {
+                dest.complete(null);
+            }
+        });
+    }
+
     /**
      * Return a future that represents the completion of any future in the 
provided Collection.
      *
@@ -131,7 +162,7 @@ public class FutureUtil {
      * @return a new CompletableFuture that is completed when all of the given 
CompletableFutures complete
      */
     public static CompletableFuture<Void> waitForAllAndSupportCancel(
-                                                    Collection<? extends 
CompletableFuture<?>> futures) {
+            Collection<? extends CompletableFuture<?>> futures) {
         CompletableFuture[] futuresArray = futures.toArray(new 
CompletableFuture[0]);
         CompletableFuture<Void> combinedFuture = 
CompletableFuture.allOf(futuresArray);
         whenCancelledOrTimedOut(combinedFuture, () -> {

Reply via email to