This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 93afd89b047 [fix][broker] One topic can be closed multiple times
concurrently (#17524)
93afd89b047 is described below
commit 93afd89b047ac56d3b7e476f578993197cf41935
Author: fengyubiao <[email protected]>
AuthorDate: Mon Apr 29 13:40:18 2024 +0800
[fix][broker] One topic can be closed multiple times concurrently (#17524)
---
.../broker/service/persistent/PersistentTopic.java | 199 +++++++++++++++++----
.../broker/service/OneWayReplicatorTest.java | 21 ++-
.../service/persistent/PersistentTopicTest.java | 81 ++++++++-
.../org/apache/pulsar/common/util/FutureUtil.java | 33 +++-
4 files changed, 290 insertions(+), 44 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 95a2b64908a..22041326ba2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -50,6 +50,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -276,6 +277,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Getter
private final ExecutorService orderedExecutor;
+ private volatile CloseFutures closeFutures;
+
@Getter
private final PersistentTopicMetrics persistentTopicMetrics = new
PersistentTopicMetrics();
@@ -299,6 +302,50 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
Long estimatedOldestUnacknowledgedMessageTimestamp;
}
+ /***
+ * We use 3 futures to prevent a new closing if there is an in-progress
deletion or closing. We make Pulsar return
+ * the in-progress one when it is called the second time.
+ *
+ * The topic closing will be called the below scenarios:
+ * 1. Calling "pulsar-admin topics unload". Relate to {@link
CloseFutures#waitDisconnectClients}.
+ * 2. Namespace bundle transfer or unloading.
+ * a. The unloading topic triggered by unloading namespace bundles will
not wait for clients disconnect. Relate
+ * to {@link CloseFutures#notWaitDisconnectClients}.
+ * b. The unloading topic triggered by unloading namespace bundles was
seperated to two steps when using
+ * {@link ExtensibleLoadManagerImpl}.
+ * b-1. step-1: fence the topic on the original Broker, and do not
trigger reconnections of clients. Relate
+ * to {@link CloseFutures#transferring}. This step is a half closing.
+ * b-2. step-2: send the owner broker information to clients and
disconnect clients. Relate
+ * to {@link CloseFutures#notWaitDisconnectClients}.
+ *
+ * The three futures will be setting as the below rule:
+ * Event: Topic close.
+ * - If the first one closing is called by "close and not disconnect
clients":
+ * - {@link CloseFutures#transferring} will be initialized as "close and
not disconnect clients".
+ * - {@link CloseFutures#waitDisconnectClients} ang {@link
CloseFutures#notWaitDisconnectClients} will be empty,
+ * the second closing will do a new close after {@link
CloseFutures#transferring} is completed.
+ * - If the first one closing is called by "close and not wait for clients
disconnect":
+ * - {@link CloseFutures#waitDisconnectClients} will be initialized as
"waiting for clients disconnect".
+ * - {@link CloseFutures#notWaitDisconnectClients} ang {@link
CloseFutures#transferring} will be
+ * initialized as "not waiting for clients disconnect" .
+ * - If the first one closing is called by "close and wait for clients
disconnect", the three futures will be
+ * initialized as "waiting for clients disconnect".
+ * Event: Topic delete.
+ * the three futures will be initialized as "waiting for clients
disconnect".
+ */
+ private class CloseFutures {
+ private final CompletableFuture<Void> transferring;
+ private final CompletableFuture<Void> notWaitDisconnectClients;
+ private final CompletableFuture<Void> waitDisconnectClients;
+
+ public CloseFutures(CompletableFuture<Void> transferring,
CompletableFuture<Void> waitDisconnectClients,
+ CompletableFuture<Void> notWaitDisconnectClients) {
+ this.transferring = transferring;
+ this.waitDisconnectClients = waitDisconnectClients;
+ this.notWaitDisconnectClients = notWaitDisconnectClients;
+ }
+ }
+
private static class TopicStatsHelper {
public double averageMsgSize;
public double aggMsgRateIn;
@@ -1417,8 +1464,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while
deleting
+ // Mark the progress of close to prevent close calling
concurrently.
+ this.closeFutures =
+ new CloseFutures(new CompletableFuture(), new
CompletableFuture(), new CompletableFuture());
- return
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
+ CompletableFuture<Void> res =
getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), ()
-> {
CompletableFuture<Void> deleteFuture = new
CompletableFuture<>();
@@ -1528,6 +1578,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
unfenceTopicToResume();
}
});
+
+ FutureUtil.completeAfter(closeFutures.transferring, res);
+ FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients,
res);
+ FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res);
+ return res;
} finally {
lock.writeLock().unlock();
}
@@ -1543,6 +1598,12 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return close(true, closeWithoutWaitingClientDisconnect);
}
+ private enum CloseTypes {
+ transferring,
+ notWaitDisconnectClients,
+ waitDisconnectClients;
+ }
+
/**
* Close this topic - close all producers and subscriptions associated
with this topic.
*
@@ -1553,32 +1614,57 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public CompletableFuture<Void> close(
boolean disconnectClients, boolean
closeWithoutWaitingClientDisconnect) {
- CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-
lock.writeLock().lock();
- try {
- if (!disconnectClients) {
- transferring = true;
- }
+ // Choose the close type.
+ CloseTypes closeType;
+ if (!disconnectClients) {
+ closeType = CloseTypes.transferring;
+ } else if (closeWithoutWaitingClientDisconnect) {
+ closeType = CloseTypes.notWaitDisconnectClients;
+ } else {
// closing managed-ledger waits until all
producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all
resources to be closed.
- if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
- fenceTopicToCloseOrDelete();
+ closeType = CloseTypes.waitDisconnectClients;
+ }
+ /** Maybe there is a in-progress half closing task. see the section
2-b-1 of {@link CloseFutures}. **/
+ CompletableFuture<Void> inProgressTransferCloseTask = null;
+ try {
+ // Return in-progress future if exists.
+ if (isClosingOrDeleting) {
+ if (closeType == CloseTypes.transferring) {
+ return closeFutures.transferring;
+ }
+ if (closeType == CloseTypes.notWaitDisconnectClients &&
closeFutures.notWaitDisconnectClients != null) {
+ return closeFutures.notWaitDisconnectClients;
+ }
+ if (closeType == CloseTypes.waitDisconnectClients &&
closeFutures.waitDisconnectClients != null) {
+ return closeFutures.waitDisconnectClients;
+ }
+ if (transferring) {
+ inProgressTransferCloseTask = closeFutures.transferring;
+ }
+ }
+ fenceTopicToCloseOrDelete();
+ if (closeType == CloseTypes.transferring) {
+ transferring = true;
+ this.closeFutures = new CloseFutures(new CompletableFuture(),
null, null);
} else {
- log.warn("[{}] Topic is already being closed or deleted",
topic);
- closeFuture.completeExceptionally(new
TopicFencedException("Topic is already fenced"));
- return closeFuture;
+ this.closeFutures =
+ new CloseFutures(new CompletableFuture(), new
CompletableFuture(), new CompletableFuture());
}
} finally {
lock.writeLock().unlock();
}
List<CompletableFuture<Void>> futures = new ArrayList<>();
+ if (inProgressTransferCloseTask != null) {
+ futures.add(inProgressTransferCloseTask);
+ }
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) ->
futures.add(replicator.terminate()));
shadowReplicators.forEach((__, replicator) ->
futures.add(replicator.terminate()));
- if (disconnectClients) {
+ if (closeType != CloseTypes.transferring) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData -> {
producers.values().forEach(producer ->
futures.add(producer.disconnect(lookupData)));
@@ -1616,40 +1702,79 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
}
- CompletableFuture<Void> clientCloseFuture =
closeWithoutWaitingClientDisconnect
- ? CompletableFuture.completedFuture(null)
- : FutureUtil.waitForAll(futures);
+ CompletableFuture<Void> disconnectClientsInCurrentCall = null;
+ // Note: "disconnectClientsToCache" is a non-able value, it is null
when close type is transferring.
+ AtomicReference<CompletableFuture<Void>> disconnectClientsToCache =
new AtomicReference<>();
+ switch (closeType) {
+ case transferring -> {
+ disconnectClientsInCurrentCall =
FutureUtil.waitForAll(futures);
+ break;
+ }
+ case notWaitDisconnectClients -> {
+ disconnectClientsInCurrentCall =
CompletableFuture.completedFuture(null);
+ disconnectClientsToCache.set(FutureUtil.waitForAll(futures));
+ break;
+ }
+ case waitDisconnectClients -> {
+ disconnectClientsInCurrentCall =
FutureUtil.waitForAll(futures);
+ disconnectClientsToCache.set(disconnectClientsInCurrentCall);
+ }
+ }
- clientCloseFuture.thenRun(() -> {
- // After having disconnected all producers/consumers, close the
managed ledger
- ledger.asyncClose(new CloseCallback() {
- @Override
- public void closeComplete(Object ctx) {
- if (disconnectClients) {
- // Everything is now closed, remove the topic from map
- disposeTopic(closeFuture);
- } else {
- closeFuture.complete(null);
- }
+ CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ Runnable closeLedgerAfterCloseClients = (() -> ledger.asyncClose(new
CloseCallback() {
+ @Override
+ public void closeComplete(Object ctx) {
+ if (closeType != CloseTypes.transferring) {
+ // Everything is now closed, remove the topic from map
+ disposeTopic(closeFuture);
+ } else {
+ closeFuture.complete(null);
}
+ }
- @Override
- public void closeFailed(ManagedLedgerException exception,
Object ctx) {
- log.error("[{}] Failed to close managed ledger, proceeding
anyway.", topic, exception);
- if (disconnectClients) {
- disposeTopic(closeFuture);
- } else {
- closeFuture.complete(null);
- }
+ @Override
+ public void closeFailed(ManagedLedgerException exception, Object
ctx) {
+ log.error("[{}] Failed to close managed ledger, proceeding
anyway.", topic, exception);
+ if (closeType != CloseTypes.transferring) {
+ disposeTopic(closeFuture);
+ } else {
+ closeFuture.complete(null);
}
- }, null);
- }).exceptionally(exception -> {
+ }
+ }, null));
+
+
disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception
-> {
log.error("[{}] Error closing topic", topic, exception);
unfenceTopicToResume();
closeFuture.completeExceptionally(exception);
return null;
});
+ switch (closeType) {
+ case transferring -> {
+ FutureUtil.completeAfterAll(closeFutures.transferring,
closeFuture);
+ break;
+ }
+ case notWaitDisconnectClients -> {
+ FutureUtil.completeAfterAll(closeFutures.transferring,
closeFuture);
+
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
+ FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients,
+ closeFuture.thenCompose(ignore ->
disconnectClientsToCache.get().exceptionally(ex -> {
+ // Since the managed ledger has been closed, eat
the error of clients disconnection.
+ log.error("[{}] Closed managed ledger, but
disconnect clients failed,"
+ + " this topic will be marked closed",
topic, ex);
+ return null;
+ })));
+ break;
+ }
+ case waitDisconnectClients -> {
+ FutureUtil.completeAfterAll(closeFutures.transferring,
closeFuture);
+
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
+
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture);
+ }
+ }
+
return closeFuture;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 9b8b567af08..eb31c13b0d5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
@@ -226,7 +227,7 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
});
}
- private void injectMockReplicatorProducerBuilder(
+ private Runnable injectMockReplicatorProducerBuilder(
BiFunction<ProducerConfigurationData,
ProducerImpl, ProducerImpl> producerDecorator)
throws Exception {
String cluster2 = pulsar2.getConfig().getClusterName();
@@ -246,7 +247,8 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
replicationClients =
WhiteboxImpl.getInternalState(brokerService, "replicationClients");
PulsarClientImpl internalClient = (PulsarClientImpl)
replicationClients.get(cluster2);
PulsarClient spyClient = spy(internalClient);
- replicationClients.put(cluster2, spyClient);
+ assertTrue(replicationClients.remove(cluster2, internalClient));
+ assertNull(replicationClients.putIfAbsent(cluster2, spyClient));
// Inject producer decorator.
doAnswer(invocation -> {
@@ -275,6 +277,12 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
}).when(spyProducerBuilder).createAsync();
return spyProducerBuilder;
}).when(spyClient).newProducer(any(Schema.class));
+
+ // Return a cleanup injection task;
+ return () -> {
+ assertTrue(replicationClients.remove(cluster2, spyClient));
+ assertNull(replicationClients.putIfAbsent(cluster2,
internalClient));
+ };
}
private SpyCursor spyCursor(PersistentTopic persistentTopic, String
cursorName) throws Exception {
@@ -368,7 +376,7 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
// If the retry counter is larger than 6, the next creation will be
slow enough to close Replicator.
final AtomicInteger createProducerCounter = new AtomicInteger();
final int failTimes = 6;
- injectMockReplicatorProducerBuilder((producerCnf, originalProducer) ->
{
+ Runnable taskToClearInjection =
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully
or not.
if (createProducerCounter.incrementAndGet() > failTimes) {
@@ -427,6 +435,7 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
});
// cleanup.
+ taskToClearInjection.run();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
@@ -531,7 +540,7 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
// If the retry counter is larger than 6, the next creation will be
slow enough to close Replicator.
final AtomicInteger createProducerCounter = new AtomicInteger();
final int failTimes = 6;
- injectMockReplicatorProducerBuilder((producerCnf, originalProducer) ->
{
+ Runnable taskToClearInjection =
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
if (topicName.equals(producerCnf.getTopicName())) {
// There is a switch to determine create producer successfully
or not.
if (createProducerCounter.incrementAndGet() > failTimes) {
@@ -593,6 +602,7 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
});
// cleanup.
+ taskToClearInjection.run();
cleanupTopics(namespaceName, () -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
@@ -644,8 +654,9 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
assertTrue(replicator2.producer != null &&
replicator2.producer.isConnected());
});
- // cleanup.
+ // cleanup the injection.
persistentTopic.getProducers().remove(mockProducerName, mockProducer);
+ // cleanup.
producer1.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 44d24668cc3..d523586c2e2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -49,6 +49,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -56,6 +58,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -322,6 +325,83 @@ public class PersistentTopicTest extends BrokerTestBase {
}
}
+ @DataProvider(name = "closeWithoutWaitingClientDisconnectInFirstBatch")
+ public Object[][] closeWithoutWaitingClientDisconnectInFirstBatch() {
+ return new Object[][]{
+ new Object[] {true},
+ new Object[] {false},
+ };
+ }
+
+ @Test(dataProvider = "closeWithoutWaitingClientDisconnectInFirstBatch")
+ public void testConcurrentClose(boolean
closeWithoutWaitingClientDisconnectInFirstBatch) throws Exception {
+ final String topicName = "persistent://prop/ns/concurrentClose";
+ final String ns = "prop/ns";
+ admin.namespaces().createNamespace(ns, 1);
+ admin.topics().createNonPartitionedTopic(topicName);
+ final Topic topic =
pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
+ List<CompletableFuture<Void>> futureList =
+ make2ConcurrentBatchesOfClose(topic, 10,
closeWithoutWaitingClientDisconnectInFirstBatch);
+ Map<Integer, List<CompletableFuture<Void>>> futureMap =
+
futureList.stream().collect(Collectors.groupingBy(Objects::hashCode));
+ /**
+ * The first call: get the return value of "topic.close".
+ * The other 19 calls: get the cached value which related {@link
PersistentTopic#closeFutures}.
+ */
+ assertTrue(futureMap.size() <= 3);
+ for (List list : futureMap.values()){
+ if (list.size() == 1){
+ // This is the first call, the future is the return value of
`topic.close`.
+ } else {
+ // Two types future list: wait client close or not.
+ assertTrue(list.size() >= 9 && list.size() <= 10);
+ }
+ }
+ }
+
+ private List<CompletableFuture<Void>> make2ConcurrentBatchesOfClose(Topic
topic, int tryTimes,
+ boolean
closeWithoutWaitingClientDisconnectInFirstBatch){
+ final List<CompletableFuture<Void>> futureList =
Collections.synchronizedList(new ArrayList<>());
+ final List<Thread> taskList = new ArrayList<>();
+ CountDownLatch allTaskBeginLatch = new CountDownLatch(1);
+ // Call a batch of close.
+ for (int i = 0; i < tryTimes; i++) {
+ Thread thread = new Thread(() -> {
+ try {
+ allTaskBeginLatch.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
futureList.add(topic.close(closeWithoutWaitingClientDisconnectInFirstBatch));
+ });
+ thread.start();
+ taskList.add(thread);
+ }
+ // Call another batch of close.
+ for (int i = 0; i < tryTimes; i++) {
+ Thread thread = new Thread(() -> {
+ try {
+ allTaskBeginLatch.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
futureList.add(topic.close(!closeWithoutWaitingClientDisconnectInFirstBatch));
+ });
+ thread.start();
+ taskList.add(thread);
+ }
+ // Wait close task executed.
+ allTaskBeginLatch.countDown();
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()->{
+ for (Thread thread : taskList){
+ if (thread.isAlive()){
+ return false;
+ }
+ }
+ return true;
+ });
+ return futureList;
+ }
@DataProvider(name = "topicAndMetricsLevel")
public Object[][] indexPatternTestData() {
@@ -331,7 +411,6 @@ public class PersistentTopicTest extends BrokerTestBase {
};
}
-
@Test(dataProvider = "topicAndMetricsLevel")
public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic,
boolean exposeTopicLevelMetrics) throws Exception {
PulsarClient client = pulsar.getClient();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 6f625898535..f6fcb12f359 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.common.util;
import com.google.common.util.concurrent.MoreExecutors;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
@@ -69,6 +70,36 @@ public class FutureUtil {
})));
}
+ /**
+ * Make the dest future complete after another one. {@param dest} is will
be completed with the same value as
+ * {@param src}, or be completed with the same error as {@param src}.
+ */
+ public static <T> void completeAfter(final CompletableFuture<T> dest,
CompletableFuture<T> src) {
+ src.whenComplete((v, ex) -> {
+ if (ex != null) {
+ dest.completeExceptionally(ex);
+ } else {
+ dest.complete(v);
+ }
+ });
+ }
+
+ /**
+ * Make the dest future complete after others. {@param dest} is will be
completed with a {@link Void} value
+ * if all the futures of {@param src} is completed, or be completed
exceptionally with the same error as the first
+ * one completed exceptionally future of {@param src}.
+ */
+ public static void completeAfterAll(final CompletableFuture<Void> dest,
+ CompletableFuture<? extends Object>...
src) {
+ FutureUtil.waitForAll(Arrays.asList(src)).whenComplete((ignore, ex) ->
{
+ if (ex != null) {
+ dest.completeExceptionally(ex);
+ } else {
+ dest.complete(null);
+ }
+ });
+ }
+
/**
* Return a future that represents the completion of any future in the
provided Collection.
*
@@ -131,7 +162,7 @@ public class FutureUtil {
* @return a new CompletableFuture that is completed when all of the given
CompletableFutures complete
*/
public static CompletableFuture<Void> waitForAllAndSupportCancel(
- Collection<? extends
CompletableFuture<?>> futures) {
+ Collection<? extends CompletableFuture<?>> futures) {
CompletableFuture[] futuresArray = futures.toArray(new
CompletableFuture[0]);
CompletableFuture<Void> combinedFuture =
CompletableFuture.allOf(futuresArray);
whenCancelledOrTimedOut(combinedFuture, () -> {