This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f45f03ed3f IGNITE-22518 Rename CompletableFutures#allOf to
CompletableFutures#allOfToList (#3932)
f45f03ed3f is described below
commit f45f03ed3f6256b8b9b2982ba7cce2b860e171d1
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Jun 17 13:24:30 2024 +0300
IGNITE-22518 Rename CompletableFutures#allOf to
CompletableFutures#allOfToList (#3932)
---
.../client/compute/ClientTaskExecution.java | 4 +-
.../ignite/internal/compute/ExecutionManager.java | 4 +-
.../internal/compute/loader/JobContextManager.java | 2 +-
.../compute/messaging/ComputeMessaging.java | 4 +-
.../compute/task/TaskExecutionInternal.java | 4 +-
.../ignite/internal/util/CompletableFutures.java | 12 ++++-
.../internal/util/CompletableFuturesTest.java | 54 +++++++++++++++++++++-
.../runner/app/AbstractSchemaChangeTest.java | 4 +-
.../internal/table/distributed/TableManager.java | 3 +-
.../replicator/PartitionReplicaListener.java | 6 +--
.../tx/impl/PersistentTxStateVacuumizer.java | 2 +-
.../internal/tx/impl/TransactionInflights.java | 4 +-
12 files changed, 82 insertions(+), 21 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
index 9df7d73b6a..c058b7d2b8 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.client.compute.ClientJobExecution.cance
import static
org.apache.ignite.internal.client.compute.ClientJobExecution.changePriority;
import static
org.apache.ignite.internal.client.compute.ClientJobExecution.getJobStatus;
import static
org.apache.ignite.internal.client.compute.ClientJobExecution.unpackJobStatus;
+import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import java.util.ArrayList;
@@ -33,7 +34,6 @@ import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.TaskExecution;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
-import org.apache.ignite.internal.util.CompletableFutures;
import org.jetbrains.annotations.Nullable;
/**
@@ -116,7 +116,7 @@ class ClientTaskExecution<R> implements TaskExecution<R> {
.map(jobId -> getJobStatus(ch, jobId))
.toArray(CompletableFuture[]::new);
- return CompletableFutures.allOf(futures)
+ return allOfToList(futures)
.thenApply(Function.identity());
});
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
index 098ae78af5..3c088f0cc7 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.compute;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Compute.RESULT_NOT_FOUND_ERR;
@@ -35,7 +36,6 @@ import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
import org.apache.ignite.internal.compute.messaging.RemoteJobExecution;
import org.apache.ignite.internal.network.TopologyService;
-import org.apache.ignite.internal.util.CompletableFutures;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -108,7 +108,7 @@ public class ExecutionManager {
.map(JobExecution::statusAsync)
.toArray(CompletableFuture[]::new);
- return CompletableFutures.allOf(statusFutures)
+ return allOfToList(statusFutures)
.thenApply(statuses -> statuses.stream()
.filter(Objects::nonNull)
.collect(toList()));
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/loader/JobContextManager.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/loader/JobContextManager.java
index 012c1c9c14..2d6646d6f6 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/loader/JobContextManager.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/loader/JobContextManager.java
@@ -148,7 +148,7 @@ public class JobContextManager {
}
private CompletableFuture<List<DeploymentUnit>>
normalizeVersions(List<DeploymentUnit> units) {
- return mapList(units, this::normalizeVersion,
CompletableFutures::allOf);
+ return mapList(units, this::normalizeVersion,
CompletableFutures::allOfToList);
}
private CompletableFuture<Void> onDemandDeploy(List<DeploymentUnit> units)
{
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
index 453e72917d..960064a923 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/messaging/ComputeMessaging.java
@@ -27,6 +27,7 @@ import static
org.apache.ignite.internal.compute.ComputeUtils.resultFromJobResul
import static
org.apache.ignite.internal.compute.ComputeUtils.statusFromJobStatusResponse;
import static
org.apache.ignite.internal.compute.ComputeUtils.statusesFromJobStatusesResponse;
import static org.apache.ignite.internal.compute.ComputeUtils.toDeploymentUnit;
+import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import static org.apache.ignite.lang.ErrorGroups.Compute.CANCELLING_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Compute.CHANGE_JOB_PRIORITY_ERR;
@@ -66,7 +67,6 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.TopologyService;
-import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -492,7 +492,7 @@ public class ComputeMessaging {
.map(request::apply)
.toArray(CompletableFuture[]::new);
- return CompletableFutures.allOf(futures).exceptionally(throwable -> {
+ return allOfToList(futures).exceptionally(throwable -> {
throw error.apply(throwable);
});
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index 0b842d1974..ab462224a8 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.compute.JobState.EXECUTING;
import static org.apache.ignite.compute.JobState.FAILED;
import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
import static org.apache.ignite.internal.util.ArrayUtils.concat;
+import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
@@ -49,7 +50,6 @@ import
org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
import org.apache.ignite.internal.compute.queue.QueueExecution;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.util.CompletableFutures;
import org.jetbrains.annotations.Nullable;
/**
@@ -242,7 +242,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
.map(JobExecution::statusAsync)
.toArray(CompletableFuture[]::new);
- return CompletableFutures.allOf(statusFutures);
+ return allOfToList(statusFutures);
});
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
index f02fff4ca7..8d2fca1ff0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java
@@ -97,7 +97,7 @@ public class CompletableFutures {
* @return Future that completes with a list of results from the source
futures.
*/
@SafeVarargs
- public static <T> CompletableFuture<List<T>> allOf(CompletableFuture<T>...
cfs) {
+ public static <T> CompletableFuture<List<T>>
allOfToList(CompletableFuture<T>... cfs) {
return CompletableFuture.allOf(cfs)
.thenApply(v -> {
var result = new ArrayList<T>(cfs.length);
@@ -110,6 +110,16 @@ public class CompletableFutures {
});
}
+ /**
+ * Returns a future that is completed when all provided futures complete
(the behavior is identical to
+ * {@link CompletableFuture#allOf}).
+ *
+ * @param futures List of futures.
+ */
+ public static CompletableFuture<Void>
allOf(Collection<CompletableFuture<?>> futures) {
+ return futures.isEmpty() ? nullCompletedFuture() :
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
+ }
+
/**
* Returns {@code true} if the future is completed successfully (so it's
not failed or cancelled).
*
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/CompletableFuturesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/CompletableFuturesTest.java
index f311b3ad93..4d6e5556cb 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/CompletableFuturesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/CompletableFuturesTest.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.testframework.asserts.CompletableFuture
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.booleanCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.completedOrFailedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
@@ -129,7 +130,7 @@ public class CompletableFuturesTest {
@Test
void testAllOfSuccessFuture() {
- CompletableFuture<List<Integer>> future = CompletableFutures.allOf(
+ CompletableFuture<List<Integer>> future = allOfToList(
nullCompletedFuture(),
completedFuture(1),
completedFuture(42)
@@ -140,7 +141,7 @@ public class CompletableFuturesTest {
@Test
void testAllFailedFuture() {
- CompletableFuture<List<Integer>> future = CompletableFutures.allOf(
+ CompletableFuture<List<Integer>> future = allOfToList(
nullCompletedFuture(),
failedFuture(new RuntimeException("test error")),
completedFuture(42)
@@ -190,4 +191,53 @@ public class CompletableFuturesTest {
assertThat(future1, willBe("test"));
}
+
+ @Test
+ void testAllOfEmpty() {
+ CompletableFuture<Void> allOfFuture0 =
CompletableFutures.allOf(List.of());
+ CompletableFuture<Void> allOfFuture1 =
CompletableFutures.allOf(Set.of());
+
+ assertTrue(allOfFuture0.isDone());
+ assertTrue(allOfFuture1.isDone());
+
+ assertThat(allOfFuture0, willBe(nullValue()));
+ assertThat(allOfFuture1, willBe(nullValue()));
+ }
+
+ @Test
+ void testAllOfSuccessfully() {
+ var future0 = new CompletableFuture<String>();
+ var future1 = new CompletableFuture<Integer>();
+
+ CompletableFuture<Void> allOfFuture =
CompletableFutures.allOf(Set.of(future0, future1));
+ assertFalse(allOfFuture.isDone());
+
+ future1.complete(1);
+ assertFalse(allOfFuture.isDone());
+
+ future0.complete("test");
+ assertTrue(allOfFuture.isDone());
+
+ assertThat(allOfFuture, willBe(nullValue()));
+ }
+
+ @Test
+ void testAllOfFailed() {
+ var future0 = new CompletableFuture<String>();
+ var future1 = new CompletableFuture<Integer>();
+
+ CompletableFuture<Void> allOfFuture =
CompletableFutures.allOf(List.of(future0, future1));
+ assertFalse(allOfFuture.isDone());
+
+ var exception0 = new Exception("from test 0");
+ var exception1 = new Exception("from test 1");
+
+ future0.completeExceptionally(exception0);
+ assertFalse(allOfFuture.isDone());
+
+ future1.completeExceptionally(exception1);
+ assertTrue(allOfFuture.isDone());
+
+ assertThat(allOfFuture, willThrow(Exception.class, "from test 0"));
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
index e2e5c1a548..8f4df419af 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.runner.app;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
@@ -34,7 +35,6 @@ import org.apache.ignite.InitParameters;
import org.apache.ignite.internal.IgniteIntegrationTest;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
import org.junit.jupiter.api.AfterEach;
@@ -141,7 +141,7 @@ abstract class AbstractSchemaChangeTest extends
IgniteIntegrationTest {
TestIgnitionManager.init(initParameters);
- return await(CompletableFutures.allOf(futures));
+ return await(allOfToList(futures));
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 0e211d2be4..c9b7e4927d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -58,6 +58,7 @@ import static
org.apache.ignite.internal.table.distributed.index.IndexUtils.regi
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -1529,7 +1530,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
.map(tableDescriptor ->
tableAsyncInternalBusy(tableDescriptor.id()))
.toArray(CompletableFuture[]::new);
- return CompletableFutures.allOf(tableImplFutures);
+ return allOfToList(tableImplFutures);
}));
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 3b87474e45..9089782324 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -38,6 +38,7 @@ import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -186,7 +187,6 @@ import
org.apache.ignite.internal.tx.message.VacuumTxStatesCommand;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
-import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.CursorUtils;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -1110,7 +1110,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
resolutionFuts[i] =
resolveRowByPkForReadOnly(primaryKeys.get(i), readTimestamp);
}
- return CompletableFutures.allOf(resolutionFuts);
+ return allOfToList(resolutionFuts);
});
}
@@ -2223,7 +2223,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
resolutionFuts[i] = resolveRowByPkForReadOnly(primaryKeys.get(i),
readTimestamp);
}
- return CompletableFutures.allOf(resolutionFuts);
+ return allOfToList(resolutionFuts);
}
/**
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
index 4ef589a24f..7ad8dd8152 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
@@ -142,7 +142,7 @@ public class PersistentTxStateVacuumizer {
futures.add(future);
});
- return allOf(futures.toArray(new CompletableFuture[0]))
+ return allOf(futures)
.handle((unused, unusedEx) -> new
PersistentTxStateVacuumResult(successful,
vacuumizedPersistentTxnStatesCount.get()));
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index 2212c2c642..58fa141a32 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -21,7 +21,7 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
-import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_PRIMARY_REPLICA_EXPIRED_ERR;
@@ -300,7 +300,7 @@ public class TransactionInflights {
});
}
- return allOf(futures)
+ return allOfToList(futures)
.thenCompose(unused -> waitNoInflights());
} else {
return nullCompletedFuture();