This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a98593607dc [improve][broker] Make some methods of `ClusterBase` pure
async. (#15527)
a98593607dc is described below
commit a98593607dc25dc739c2558452c417b970e5b23a
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri May 20 08:47:41 2022 +0800
[improve][broker] Make some methods of `ClusterBase` pure async. (#15527)
---
.../broker/resources/NamespaceResources.java | 15 +
.../pulsar/broker/admin/impl/ClustersBase.java | 303 ++++++++-------------
.../org/apache/pulsar/broker/admin/AdminTest.java | 8 +-
.../org/apache/pulsar/common/util/FutureUtil.java | 11 +
4 files changed, 143 insertions(+), 194 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index c24df6c586f..2223e951f66 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -212,6 +212,21 @@ public class NamespaceResources extends
BaseResources<Policies> {
set(joinPath(BASE_CLUSTERS_PATH, cluster,
NAMESPACE_ISOLATION_POLICIES), modifyFunction);
}
+ public CompletableFuture<Void> setIsolationDataAsync(String cluster,
+
Function<Map<String, NamespaceIsolationDataImpl>,
+ Map<String,
NamespaceIsolationDataImpl>> modifyFunction) {
+ return setAsync(joinPath(BASE_CLUSTERS_PATH, cluster,
NAMESPACE_ISOLATION_POLICIES), modifyFunction);
+ }
+
+ public CompletableFuture<Void> setIsolationDataWithCreateAsync(String
cluster,
+
Function<Optional<Map<String,
+
NamespaceIsolationDataImpl>>,
+
Map<String, NamespaceIsolationDataImpl>>
+
createFunction) {
+ return setWithCreateAsync(joinPath(BASE_CLUSTERS_PATH, cluster,
NAMESPACE_ISOLATION_POLICIES),
+ createFunction);
+ }
+
public void setIsolationDataWithCreate(String cluster,
Function<Optional<Map<String,
NamespaceIsolationDataImpl>>, Map<String,
NamespaceIsolationDataImpl>>
createFunction)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 80309b2a026..a1fd8a24733 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import com.google.common.collect.Lists;
+import static javax.ws.rs.core.Response.Status.PRECONDITION_FAILED;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -33,8 +33,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -46,11 +46,13 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import org.apache.bookkeeper.common.util.JsonUtil;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import
org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
@@ -58,12 +60,10 @@ import
org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
-import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
@@ -170,7 +170,7 @@ public class ClustersBase extends AdminResource {
log.error("[{}] Failed to create cluster {}",
clientAppId(), cluster, ex);
Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof IllegalArgumentException) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ asyncResponse.resume(new
RestException(PRECONDITION_FAILED,
"Cluster name is not valid"));
return null;
}
@@ -278,13 +278,13 @@ public class ClustersBase extends AdminResource {
if (CollectionUtils.isNotEmpty(peerClusterNames)) {
future =
FutureUtil.waitForAll(peerClusterNames.stream().map(peerCluster -> {
if (cluster.equalsIgnoreCase(peerCluster)) {
- return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ return FutureUtil.failedFuture(new
RestException(PRECONDITION_FAILED,
cluster + " itself can't be part of peer-list"));
}
return clusterResources().getClusterAsync(peerCluster)
.thenAccept(peerClusterOpt -> {
if (!peerClusterOpt.isPresent()) {
- throw new
RestException(Status.PRECONDITION_FAILED,
+ throw new RestException(PRECONDITION_FAILED,
"Peer cluster " + peerCluster + " does
not exist");
}
});
@@ -365,14 +365,14 @@ public class ClustersBase extends AdminResource {
return
pulsar().getPulsarResources().getClusterResources().isClusterUsedAsync(cluster)
.thenCompose(isClusterUsed -> {
if (isClusterUsed) {
- throw new RestException(Status.PRECONDITION_FAILED,
"Cluster not empty");
+ throw new RestException(PRECONDITION_FAILED, "Cluster
not empty");
}
// check the namespaceIsolationPolicies associated with
the cluster
return
namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster);
}).thenCompose(nsIsolationPoliciesOpt -> {
if (nsIsolationPoliciesOpt.isPresent()) {
if
(!nsIsolationPoliciesOpt.get().getPolicies().isEmpty()) {
- throw new
RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
+ throw new RestException(PRECONDITION_FAILED,
"Cluster not empty");
}
// Need to delete the isolation policies if present
return
namespaceIsolationPolicies().deleteIsolationDataAsync(cluster);
@@ -509,9 +509,10 @@ public class ClustersBase extends AdminResource {
});
}
+
private BrokerNamespaceIsolationData internalGetBrokerNsIsolationData(
- String
broker,
-
Map<String, NamespaceIsolationDataImpl> policies) {
+ String broker,
+ Map<String, NamespaceIsolationDataImpl> policies) {
BrokerNamespaceIsolationData.Builder brokerIsolationData =
BrokerNamespaceIsolationData.builder().brokerName(broker);
if (policies == null) {
@@ -543,49 +544,23 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
- public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ public void getBrokerWithNamespaceIsolationPolicy(
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
- @ApiParam(
- value = "The broker name (<broker-hostname>:<web-service-port>)",
- required = true,
- example = "broker1:8080"
- )
+ @ApiParam(value = "The broker name
(<broker-hostname>:<web-service-port>)", required = true,
+ example = "broker1:8080")
@PathParam("broker") String broker) {
- validateSuperUserAccess();
- validateClusterExists(cluster);
-
- Map<String, ? extends NamespaceIsolationData> nsPolicies;
- try {
- Optional<NamespaceIsolationPolicies> nsPoliciesResult =
namespaceIsolationPolicies()
- .getIsolationDataPolicies(cluster);
- if (!nsPoliciesResult.isPresent()) {
- throw new RestException(Status.NOT_FOUND, "namespace-isolation
policies not found for " + cluster);
- }
- nsPolicies = nsPoliciesResult.get().getPolicies();
- } catch (Exception e) {
- log.error("[{}] Failed to get namespace isolation-policies {}",
clientAppId(), cluster, e);
- throw new RestException(e);
- }
- BrokerNamespaceIsolationData.Builder brokerIsolationData =
BrokerNamespaceIsolationData.builder()
- .brokerName(broker);
- if (nsPolicies != null) {
- List<String> namespaceRegexes = new ArrayList<>();
- nsPolicies.forEach((name, policyData) -> {
- NamespaceIsolationPolicyImpl nsPolicyImpl = new
NamespaceIsolationPolicyImpl(policyData);
- boolean isPrimary = nsPolicyImpl.isPrimaryBroker(broker);
- if (isPrimary || nsPolicyImpl.isSecondaryBroker(broker)) {
- namespaceRegexes.addAll(policyData.getNamespaces());
- brokerIsolationData.primary(isPrimary);
- brokerIsolationData.policyName(name);
- }
- });
- brokerIsolationData.namespaceRegex(namespaceRegexes);
- }
- return brokerIsolationData.build();
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
+ .thenCompose(__ ->
internalGetNamespaceIsolationPolicies(cluster))
+ .thenApply(policies ->
internalGetBrokerNsIsolationData(broker, policies))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get namespace isolation-policies
{}", clientAppId(), cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@@ -603,150 +578,99 @@ public class ClustersBase extends AdminResource {
})
public void setNamespaceIsolationPolicy(
@Suspended final AsyncResponse asyncResponse,
- @ApiParam(
- value = "The cluster name",
- required = true
- )
+ @ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
- @ApiParam(
- value = "The namespace isolation policy name",
- required = true
- )
+ @ApiParam(value = "The namespace isolation policy name", required =
true)
@PathParam("policyName") String policyName,
- @ApiParam(
- value = "The namespace isolation policy data",
- required = true
- )
- NamespaceIsolationDataImpl policyData
+ @ApiParam(value = "The namespace isolation policy data", required =
true)
+ NamespaceIsolationDataImpl policyData
) {
- validateSuperUserAccess();
- validateClusterExists(cluster);
- validatePoliciesReadOnlyAccess();
-
- String jsonInput = null;
- try {
- // validate the policy data before creating the node
- policyData.validate();
- jsonInput =
ObjectMapperFactory.create().writeValueAsString(policyData);
-
- NamespaceIsolationPolicies nsIsolationPolicies =
namespaceIsolationPolicies()
- .getIsolationDataPolicies(cluster).orElseGet(() -> {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
+ .thenCompose(__ -> {
+ // validate the policy data before creating the node
+ policyData.validate();
+ return
namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster);
+ }).thenCompose(nsIsolationPoliciesOpt ->
+
nsIsolationPoliciesOpt.map(CompletableFuture::completedFuture)
+ .orElseGet(() -> namespaceIsolationPolicies()
+
.setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
+ .thenApply(__ -> new
NamespaceIsolationPolicies()))
+ ).thenCompose(nsIsolationPolicies -> {
+ nsIsolationPolicies.setPolicy(policyName, policyData);
+ return namespaceIsolationPolicies()
+ .setIsolationDataAsync(cluster, old ->
nsIsolationPolicies.getPolicies());
+ }).thenCompose(__ ->
filterAndUnloadMatchedNamespaceAsync(policyData))
+ .thenAccept(__ -> {
+ log.info("[{}] Successful to update
clusters/{}/namespaceIsolationPolicies/{}.",
+ clientAppId(), cluster, policyName);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof IllegalArgumentException) {
+ String jsonData;
try {
-
namespaceIsolationPolicies().setIsolationDataWithCreate(cluster,
- (p) -> Collections.emptyMap());
- return new NamespaceIsolationPolicies();
- } catch (Exception e) {
- throw new RestException(e);
+ jsonData = JsonUtil.toJson(policyData);
+ } catch (JsonUtil.ParseJsonException e) {
+ jsonData = "[Failed to serialize]";
}
- });
-
- nsIsolationPolicies.setPolicy(policyName, policyData);
- namespaceIsolationPolicies().setIsolationData(cluster, old ->
nsIsolationPolicies.getPolicies());
-
- // whether or not make the isolation update on time.
- if
(pulsar().getConfiguration().isEnableNamespaceIsolationUpdateOnTime()) {
- filterAndUnloadMatchedNameSpaces(asyncResponse, policyData);
- } else {
- asyncResponse.resume(Response.noContent().build());
- return;
- }
- } catch (IllegalArgumentException iae) {
- log.info("[{}] Failed to update
clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",
- clientAppId(), cluster, policyName, iae);
- asyncResponse.resume(new RestException(Status.BAD_REQUEST,
- "Invalid format of input policy data. policy: " +
policyName + "; data: " + jsonInput));
- } catch (NotFoundException nne) {
- log.warn("[{}] Failed to update
clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
- cluster);
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- "NamespaceIsolationPolicies for cluster " + cluster + "
does not exist"));
- } catch (Exception e) {
- log.error("[{}] Failed to update
clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
- policyName, e);
- asyncResponse.resume(new RestException(e));
- }
- }
-
- // get matched namespaces; call unload for each namespaces;
- private void filterAndUnloadMatchedNameSpaces(AsyncResponse asyncResponse,
- NamespaceIsolationDataImpl
policyData) throws Exception {
- Namespaces namespaces = pulsar().getAdminClient().namespaces();
-
- List<String> nssToUnload = Lists.newArrayList();
-
- pulsar().getAdminClient().tenants().getTenantsAsync()
- .whenComplete((tenants, ex) -> {
- if (ex != null) {
- log.error("[{}] Failed to get tenants when
setNamespaceIsolationPolicy.", clientAppId(), ex);
- return;
- }
- AtomicInteger tenantsNumber = new
AtomicInteger(tenants.size());
- // get all tenants now, for each tenants, get its namespaces
- tenants.forEach(tenant -> namespaces.getNamespacesAsync(tenant)
- .whenComplete((nss, e) -> {
- int leftTenantsToHandle =
tenantsNumber.decrementAndGet();
- if (e != null) {
- log.error("[{}] Failed to get namespaces for
tenant {} when setNamespaceIsolationPolicy.",
- clientAppId(), tenant, e);
-
- if (leftTenantsToHandle == 0) {
- unloadMatchedNamespacesList(asyncResponse,
nssToUnload, namespaces);
- }
-
- return;
- }
-
- AtomicInteger nssNumber = new
AtomicInteger(nss.size());
-
- // get all namespaces for this tenant now.
- nss.forEach(namespaceName -> {
- int leftNssToHandle = nssNumber.decrementAndGet();
-
- // if namespace match any policy regex, add it to
ns list to be unload.
- if (policyData.getNamespaces().stream()
- .anyMatch(nsnameRegex ->
namespaceName.matches(nsnameRegex))) {
- nssToUnload.add(namespaceName);
- }
-
- // all the tenants & namespaces get filtered.
- if (leftNssToHandle == 0 && leftTenantsToHandle ==
0) {
- unloadMatchedNamespacesList(asyncResponse,
nssToUnload, namespaces);
- }
- });
- }));
- });
+ asyncResponse.resume(new
RestException(Status.BAD_REQUEST,
+ "Invalid format of input policy data. policy:
" + policyName + "; data: " + jsonData));
+ return null;
+ } else if (realCause instanceof NotFoundException) {
+ log.warn("[{}] Failed to update
clusters/{}/namespaceIsolationPolicies: Does not exist",
+ clientAppId(), cluster);
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+ "NamespaceIsolationPolicies for cluster " +
cluster + " does not exist"));
+ return null;
+ }
+ log.info("[{}] Failed to update
clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",
+ clientAppId(), cluster, policyName, realCause);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
- private void unloadMatchedNamespacesList(AsyncResponse asyncResponse,
- List<String> nssToUnload,
- Namespaces namespaces) {
- if (nssToUnload.size() == 0) {
- asyncResponse.resume(Response.noContent().build());
- return;
+ /**
+ * Get matched namespaces; call unload for each namespaces.
+ */
+ private CompletableFuture<Void>
filterAndUnloadMatchedNamespaceAsync(NamespaceIsolationDataImpl policyData) {
+ PulsarAdmin adminClient;
+ try {
+ adminClient = pulsar().getAdminClient();
+ } catch (PulsarServerException e) {
+ return FutureUtil.failedFuture(e);
}
-
- List<CompletableFuture<Void>> futures = nssToUnload.stream()
- .map(namespaceName -> namespaces.unloadAsync(namespaceName))
- .collect(Collectors.toList());
-
- FutureUtil.waitForAll(futures).whenComplete((result, exception) -> {
- if (exception != null) {
- log.error("[{}] Failed to unload namespace while
setNamespaceIsolationPolicy.",
- clientAppId(), exception);
- asyncResponse.resume(new RestException(exception));
- return;
- }
-
- try {
- // write load info to load manager to make the load happens
fast
-
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
- } catch (Exception e) {
- log.warn("[{}] Failed to writeLoadReportOnZookeeper.",
clientAppId(), e);
- }
-
- asyncResponse.resume(Response.noContent().build());
- return;
- });
+ return adminClient.tenants().getTenantsAsync()
+ .thenCompose(tenants -> {
+ Stream<CompletableFuture<List<String>>>
completableFutureStream = tenants.stream()
+ .map(tenant ->
adminClient.namespaces().getNamespacesAsync(tenant));
+ return FutureUtil.waitForAll(completableFutureStream)
+ .thenApply(namespaces -> {
+ // if namespace match any policy regex, add it
to ns list to be unload.
+ return namespaces.stream()
+ .filter(namespaceName ->
+
policyData.getNamespaces().stream().anyMatch(namespaceName::matches))
+ .collect(Collectors.toList());
+ });
+ }).thenCompose(shouldUnloadNamespaces -> {
+ if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ List<CompletableFuture<Void>> futures =
shouldUnloadNamespaces.stream()
+ .map(namespaceName ->
adminClient.namespaces().unloadAsync(namespaceName))
+ .collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures)
+ .thenAccept(__ -> {
+ try {
+ // write load info to load manager to make
the load happens fast
+
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
+ } catch (Exception e) {
+ log.warn("[{}] Failed to
writeLoadReportOnZookeeper.", clientAppId(), e);
+ }
+ });
+ });
}
@DELETE
@@ -1012,5 +936,4 @@ public class ClustersBase extends AdminResource {
}
private static final Logger log =
LoggerFactory.getLogger(ClustersBase.class);
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 3f28f9d8e44..ff41edfb48b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -253,8 +253,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
.parameters(parameters1)
.build())
.build();
- AsyncResponse response = mock(AsyncResponse.class);
- clusters.setNamespaceIsolationPolicy(response,"use", "policy1",
policyData);
+ asyncRequests(ctx -> clusters.setNamespaceIsolationPolicy(ctx,
+ "use", "policy1", policyData));
asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx,
"use"));
try {
@@ -403,8 +403,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
}
- verify(clusters, times(22)).validateSuperUserAccessAsync();
- verify(clusters, times(2)).validateSuperUserAccess();
+ verify(clusters, times(23)).validateSuperUserAccessAsync();
+ verify(clusters, times(1)).validateSuperUserAccess();
}
@Test
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index e5c2caeb7d0..afad61eb669 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.common.util;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -31,6 +33,7 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* This class is aimed at simplifying work with {@code CompletableFuture}.
@@ -47,6 +50,14 @@ public class FutureUtil {
return CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0]));
}
+ public static <T> CompletableFuture<List<T>>
waitForAll(Stream<CompletableFuture<List<T>>> futures) {
+ return futures.reduce(CompletableFuture.completedFuture(new
ArrayList<>()),
+ (pre, curr) -> pre.thenCompose(preV -> curr.thenApply(currV ->
{
+ preV.addAll(currV);
+ return preV;
+ })));
+ }
+
/**
* Return a future that represents the completion of any future in the
provided Collection.
*