This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9b59ef6cc5 IGNITE-20645 Make ComputeJob.execute asynchronous (#3920)
9b59ef6cc5 is described below
commit 9b59ef6cc5974757628f3e2b83185c8430f81a01
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Jun 17 17:59:52 2024 +0300
IGNITE-20645 Make ComputeJob.execute asynchronous (#3920)
* Change `R ComputeJob.execute(...)` to `CompletableFuture<R>
ComputeJob.executeAsync(...)`
* Update cancellation logic to deal with the resulting future
---
.../java/org/apache/ignite/compute/ComputeJob.java | 8 ++-
.../org/apache/ignite/compute/IgniteCompute.java | 2 +-
...ClientStreamerWithReceiverBatchSendRequest.java | 6 +-
.../apache/ignite/client/fakes/FakeCompute.java | 18 +++--
.../internal/compute/ItComputeTestEmbedded.java | 10 +--
.../threading/ItComputeApiThreadingTest.java | 5 +-
.../internal/compute/utils/InteractiveJobs.java | 14 ++--
.../apache/ignite/internal/compute/ConcatJob.java | 9 ++-
.../apache/ignite/internal/compute/FailingJob.java | 3 +-
.../ignite/internal/compute/GetNodeNameJob.java | 7 +-
.../internal/compute/NonEmptyConstructorJob.java | 7 +-
.../apache/ignite/internal/compute/SleepJob.java | 3 +-
.../compute/executor/ComputeExecutorImpl.java | 2 +-
.../compute/queue/PriorityQueueExecutor.java | 5 +-
.../ignite/internal/compute/queue/QueueEntry.java | 25 +++++--
.../internal/compute/queue/QueueExecutionImpl.java | 7 +-
.../compute/task/TaskExecutionInternal.java | 5 +-
.../internal/compute/ComputeComponentImplTest.java | 12 ++--
.../compute/executor/ComputeExecutorTest.java | 24 +++----
.../compute/loader/JobClassLoaderFactoryTest.java | 15 +++--
.../compute/queue/PriorityQueueExecutorTest.java | 23 +++----
.../apache/ignite/internal/compute/UnitJob.java | 7 +-
.../apache/ignite/internal/compute/UnitJob.java | 7 +-
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 2 +-
.../rest/compute/ItComputeControllerTest.java | 3 +-
.../runner/app/PlatformTestNodeRunner.java | 23 +++----
.../runner/app/client/ItThinClientComputeTest.java | 76 +++++++++++++++-------
.../client/ItThinClientPartitionAwarenessTest.java | 7 +-
28 files changed, 211 insertions(+), 124 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
index 515e902bd3..97db8b05ea 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
@@ -17,18 +17,22 @@
package org.apache.ignite.compute;
+import java.util.concurrent.CompletableFuture;
+import org.jetbrains.annotations.Nullable;
+
/**
* A Compute job that may be executed on a single Ignite node, on several
nodes, or on the entire cluster.
*
* @param <R> Job result type.
*/
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
public interface ComputeJob<R> {
/**
* Executes the job on an Ignite node.
*
* @param context The execution context.
* @param args Job arguments.
- * @return Job result.
+ * @return Job future. Can be null if the job is synchronous and does not
return any result.
*/
- R execute(JobExecutionContext context, Object... args);
+ @Nullable CompletableFuture<R> executeAsync(JobExecutionContext context,
Object... args);
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 54b23d478c..c0fb02869d 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -37,7 +37,7 @@ import org.apache.ignite.table.mapper.Mapper;
* Provides the ability to execute Compute jobs.
*
* @see ComputeJob
- * @see ComputeJob#execute(JobExecutionContext, Object...)
+ * @see ComputeJob#executeAsync(JobExecutionContext, Object...)
*/
public interface IgniteCompute {
/**
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java
index 48078c7c75..46967bd119 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java
@@ -102,7 +102,7 @@ public class ClientStreamerWithReceiverBatchSendRequest {
private static class ReceiverRunnerJob implements ComputeJob<List<Object>>
{
@Override
- public @Nullable List<Object> execute(JobExecutionContext context,
Object... args) {
+ public @Nullable CompletableFuture<List<Object>>
executeAsync(JobExecutionContext context, Object... args) {
int payloadElementCount = (int) args[0];
byte[] payload = (byte[]) args[1];
@@ -113,9 +113,7 @@ public class ClientStreamerWithReceiverBatchSendRequest {
DataStreamerReceiver<Object, Object> receiver =
ComputeUtils.instantiateReceiver(receiverClass);
DataStreamerReceiverContext receiverContext = context::ignite;
- CompletableFuture<List<Object>> receiveFut =
receiver.receive(receiverInfo.items(), receiverContext, receiverInfo.args());
-
- return receiveFut == null ? null : receiveFut.join();
+ return receiver.receive(receiverInfo.items(), receiverContext,
receiverInfo.args());
}
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index cc3e9bd366..0fa44c7007 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -21,6 +21,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.compute.JobState.COMPLETED;
import static org.apache.ignite.compute.JobState.EXECUTING;
import static org.apache.ignite.compute.JobState.FAILED;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import java.time.Instant;
@@ -100,19 +101,22 @@ public class FakeCompute implements IgniteComputeInternal
{
throw new RuntimeException(e);
}
- if (err != null) {
- throw err;
+ var err0 = err;
+ if (err0 != null) {
+ throw err0;
}
if (jobClassName.startsWith("org.apache.ignite")) {
- Class<ComputeJob<Object>> jobClass =
ComputeUtils.jobClass(this.getClass().getClassLoader(), jobClassName);
- ComputeJob<Object> job = ComputeUtils.instantiateJob(jobClass);
- Object jobRes = job.execute(new JobExecutionContextImpl(ignite,
new AtomicBoolean(), this.getClass().getClassLoader()), args);
+ Class<ComputeJob<R>> jobClass =
ComputeUtils.jobClass(this.getClass().getClassLoader(), jobClassName);
+ ComputeJob<R> job = ComputeUtils.instantiateJob(jobClass);
+ CompletableFuture<R> jobFut = job.executeAsync(
+ new JobExecutionContextImpl(ignite, new AtomicBoolean(),
this.getClass().getClassLoader()), args);
- return jobExecution(completedFuture((R) jobRes));
+ return jobExecution(jobFut != null ? jobFut :
nullCompletedFuture());
}
- return jobExecution(future != null ? future : completedFuture((R)
nodeName));
+ var future0 = future;
+ return jobExecution(future0 != null ? future0 : completedFuture((R)
nodeName));
}
/** {@inheritDoc} */
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index 7ba202c790..4aed1cf525 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -341,14 +341,14 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
private static class CustomFailingJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
throw ExceptionUtils.sneakyThrow((Throwable) args[0]);
}
}
private static class WaitLatchJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
try {
((CountDownLatch) args[0]).await();
} catch (InterruptedException e) {
@@ -362,7 +362,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
static final AtomicInteger counter = new AtomicInteger(0);
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
try {
((CountDownLatch) args[0]).await();
if (counter.incrementAndGet() == 1) {
@@ -377,7 +377,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
private static class PerformSyncKvGetPutJob implements ComputeJob<Void> {
@Override
- public Void execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<Void> executeAsync(JobExecutionContext
context, Object... args) {
Table table = context.ignite().tables().table("test");
KeyValueView<Integer, Integer> view =
table.keyValueView(Integer.class, Integer.class);
@@ -390,7 +390,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
private static class NullReturningJob implements ComputeJob<Void> {
@Override
- public Void execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<Void> executeAsync(JobExecutionContext
context, Object... args) {
return null;
}
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
index aabc6db098..d19788588a 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.compute.threading;
import static java.lang.Thread.currentThread;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.PublicApiThreadingTests.anIgniteThread;
import static
org.apache.ignite.internal.PublicApiThreadingTests.asyncContinuationPool;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -141,8 +142,8 @@ class ItComputeApiThreadingTest extends
ClusterPerClassIntegrationTest {
private static class NoOpJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
- return "ok";
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
+ return completedFuture("ok");
}
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
index 1d5301504f..25d5022ea5 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.compute.utils;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -25,6 +26,7 @@ import static org.hamcrest.Matchers.notNullValue;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -163,7 +165,7 @@ public final class InteractiveJobs {
}
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet();
offerArgsAsSignals(args);
@@ -178,9 +180,9 @@ public final class InteractiveJobs {
GLOBAL_CHANNEL.offer(ACK);
break;
case RETURN:
- return "Done";
+ return completedFuture("Done");
case RETURN_WORKER_NAME:
- return context.ignite().name();
+ return completedFuture(context.ignite().name());
case GET_WORKER_NAME:
GLOBAL_CHANNEL.add(context.ignite().name());
break;
@@ -226,7 +228,7 @@ public final class InteractiveJobs {
}
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet();
try {
@@ -244,9 +246,9 @@ public final class InteractiveJobs {
NODE_CHANNELS.get(workerNodeName).offer(ACK);
break;
case RETURN:
- return "Done";
+ return completedFuture("Done");
case RETURN_WORKER_NAME:
- return workerNodeName;
+ return completedFuture(workerNodeName);
case GET_WORKER_NAME:
NODE_CHANNELS.get(workerNodeName).add(workerNodeName);
break;
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java
index 81decb7f4a..b235ad0fea 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java
@@ -17,7 +17,10 @@
package org.apache.ignite.internal.compute;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
@@ -25,9 +28,9 @@ import org.apache.ignite.compute.JobExecutionContext;
/** Compute job that concatenates the string representation of its arguments.
*/
public class ConcatJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
- return Arrays.stream(args)
+ public CompletableFuture<String> executeAsync(JobExecutionContext context,
Object... args) {
+ return completedFuture(Arrays.stream(args)
.map(Object::toString)
- .collect(Collectors.joining());
+ .collect(Collectors.joining()));
}
}
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java
index 4286c56bb7..20497cf23d 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.compute;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
/** Compute job that always fails with the {@link JobException}. */
public class FailingJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext context,
Object... args) {
throw new JobException("Oops", new Exception());
}
}
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java
index 558077adf0..226fbf83b3 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java
@@ -17,13 +17,16 @@
package org.apache.ignite.internal.compute;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
/** Compute job that returns the node name. */
public class GetNodeNameJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
- return context.ignite().name();
+ public CompletableFuture<String> executeAsync(JobExecutionContext context,
Object... args) {
+ return completedFuture(context.ignite().name());
}
}
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java
index 7aaa6ecf6b..01721fd2fa 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.compute;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
@@ -27,7 +30,7 @@ public class NonEmptyConstructorJob implements
ComputeJob<String> {
/** {@inheritDoc} */
@Override
- public String execute(JobExecutionContext context, Object... args) {
- return "";
+ public CompletableFuture<String> executeAsync(JobExecutionContext context,
Object... args) {
+ return completedFuture("");
}
}
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
index 0dd32099c1..9c8c2592bb 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.compute;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
@@ -24,7 +25,7 @@ import org.apache.ignite.compute.JobExecutionContext;
/** Compute job that sleeps for a number of milliseconds passed in the
argument. */
public class SleepJob implements ComputeJob<Void> {
@Override
- public Void execute(JobExecutionContext jobExecutionContext, Object...
args) {
+ public CompletableFuture<Void> executeAsync(JobExecutionContext
jobExecutionContext, Object... args) {
try {
TimeUnit.SECONDS.sleep((Long) args[0]);
return null;
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index efb9baa5e1..acb0c231ab 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -83,7 +83,7 @@ public class ComputeExecutorImpl implements ComputeExecutor {
JobExecutionContext context = new JobExecutionContextImpl(ignite,
isInterrupted, classLoader);
QueueExecution<R> execution = executorService.submit(
- () -> ComputeUtils.instantiateJob(jobClass).execute(context,
args),
+ () ->
ComputeUtils.instantiateJob(jobClass).executeAsync(context, args),
options.priority(),
options.maxRetries()
);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
index 7823cf0524..3eb2fa65f9 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
@@ -21,6 +21,7 @@ import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
@@ -71,7 +72,7 @@ public class PriorityQueueExecutor {
* @param maxRetries Number of retries of the execution after failure,
{@code 0} means the execution will not be retried.
* @return Completable future which will be finished when compute job
finished.
*/
- public <R> QueueExecution<R> submit(Callable<R> job, int priority, int
maxRetries) {
+ public <R> QueueExecution<R> submit(Callable<CompletableFuture<R>> job,
int priority, int maxRetries) {
Objects.requireNonNull(job);
UUID jobId = stateMachine.initJob();
@@ -88,7 +89,7 @@ public class PriorityQueueExecutor {
* @param <R> Job result type.
* @return Completable future which will be finished when compute job
finished.
*/
- public <R> QueueExecution<R> submit(Callable<R> job) {
+ public <R> QueueExecution<R> submit(Callable<CompletableFuture<R>> job) {
return submit(job, 0, 0);
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
index c0654f804a..c5842a98b6 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.compute.queue;
+import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
+
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
@@ -38,7 +40,7 @@ class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
private final CompletableFuture<R> future = new CompletableFuture<>();
- private final Callable<R> jobAction;
+ private final Callable<CompletableFuture<R>> jobAction;
private final int priority;
@@ -47,6 +49,9 @@ class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
/** Thread used to run the job, initialized once the job starts executing.
*/
private @Nullable Thread workerThread;
+ /** Future returned from jobAction.call(). */
+ private @Nullable CompletableFuture<R> jobFuture;
+
private final Lock lock = new ReentrantLock();
private volatile boolean isInterrupted;
@@ -57,7 +62,7 @@ class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
* @param jobAction Compute job callable.
* @param priority Job priority.
*/
- QueueEntry(Callable<R> jobAction, int priority) {
+ QueueEntry(Callable<CompletableFuture<R>> jobAction, int priority) {
this.jobAction = jobAction;
this.priority = priority;
seqNum = seq.getAndIncrement();
@@ -73,7 +78,14 @@ class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
}
try {
- future.complete(jobAction.call());
+ jobFuture = jobAction.call();
+
+ if (jobFuture == null) {
+ // Allow null futures for synchronous jobs.
+ future.complete(null);
+ } else {
+ jobFuture.whenComplete(copyStateTo(future));
+ }
} catch (Throwable e) {
future.completeExceptionally(e);
} finally {
@@ -108,6 +120,11 @@ class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
isInterrupted = true;
workerThread.interrupt();
}
+
+ if (jobFuture != null) {
+ isInterrupted = true;
+ jobFuture.cancel(true);
+ }
} finally {
lock.unlock();
}
@@ -147,6 +164,6 @@ class QueueEntry<R> implements Runnable,
Comparable<QueueEntry<R>> {
@Override
public int hashCode() {
- return (int) (seqNum ^ (seqNum >>> 32));
+ return Long.hashCode(seqNum);
}
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index 3b29d421d8..16612b2da5 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -42,7 +42,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
private static final IgniteLogger LOG =
Loggers.forClass(QueueExecutionImpl.class);
private final UUID jobId;
- private final Callable<R> job;
+ private final Callable<CompletableFuture<R>> job;
private final ComputeThreadPoolExecutor executor;
private final ComputeStateMachine stateMachine;
@@ -67,7 +67,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
*/
QueueExecutionImpl(
UUID jobId,
- Callable<R> job,
+ Callable<CompletableFuture<R>> job,
int priority,
ComputeThreadPoolExecutor executor,
ComputeStateMachine stateMachine) {
@@ -126,7 +126,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
try {
QueueEntry<R> queueEntry = this.queueEntry;
- if (executor.removeFromQueue(queueEntry)) {
+ if (queueEntry != null && executor.removeFromQueue(queueEntry)) {
this.priority = newPriority;
this.queueEntry = null;
run();
@@ -152,6 +152,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
private void run() {
QueueEntry<R> queueEntry = new QueueEntry<>(() -> {
stateMachine.executeJob(jobId);
+
return job.call();
}, priority);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index 32f1883226..c36aae3865 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -94,7 +94,8 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
splitExecution = executorService.submit(
() -> {
MapReduceTask<R> task = instantiateTask(taskClass);
- return new SplitResult<>(task, task.split(context, args));
+
+ return completedFuture(new SplitResult<>(task,
task.split(context, args)));
},
Integer.MAX_VALUE,
0
@@ -115,7 +116,7 @@ public class TaskExecutionInternal<R> implements
JobExecution<R> {
MapReduceTask<R> task =
splitExecution.resultAsync().thenApply(SplitResult::task).join();
return executorService.submit(
- () -> task.reduce(results),
+ () -> completedFuture(task.reduce(results)),
Integer.MAX_VALUE,
0
);
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index afd538acda..ff58fe92cd 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -688,15 +688,15 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
private static class SimpleJob implements ComputeJob<String> {
/** {@inheritDoc} */
@Override
- public String execute(JobExecutionContext context, Object... args) {
- return "jobResponse";
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
+ return completedFuture("jobResponse");
}
}
private static class FailingJob implements ComputeJob<String> {
/** {@inheritDoc} */
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
throw new JobException("Oops", new Exception());
}
}
@@ -710,15 +710,15 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
private static class GetThreadNameJob implements ComputeJob<String> {
/** {@inheritDoc} */
@Override
- public String execute(JobExecutionContext context, Object... args) {
- return Thread.currentThread().getName();
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
+ return completedFuture(Thread.currentThread().getName());
}
}
private static class LongJob implements ComputeJob<String> {
/** {@inheritDoc} */
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
try {
Thread.sleep(1_000_000);
} catch (InterruptedException e) {
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index 523f99a5f6..059c5aee24 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.compute.executor;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.compute.JobState.CANCELED;
import static org.apache.ignite.compute.JobState.COMPLETED;
import static org.apache.ignite.compute.JobState.EXECUTING;
@@ -28,6 +29,7 @@ import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.ComputeJob;
@@ -86,13 +88,13 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
private static class InterruptingJob implements ComputeJob<Integer> {
@Override
- public Integer execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- return 0;
+ return completedFuture(0);
}
}
}
@@ -116,11 +118,11 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
private static class CancellingJob implements ComputeJob<Integer> {
@Override
- public Integer execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
while (true) {
try {
if (context.isCancelled()) {
- return 0;
+ return completedFuture(0);
}
Thread.sleep(100);
} catch (InterruptedException e) {
@@ -151,7 +153,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
private static class RetryJobFail implements ComputeJob<Integer> {
@Override
- public Integer execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
AtomicInteger runTimes = (AtomicInteger) args[0];
runTimes.incrementAndGet();
throw new RuntimeException();
@@ -179,13 +181,13 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
private static class RetryJobSuccess implements ComputeJob<Integer> {
@Override
- public Integer execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
AtomicInteger runTimes = (AtomicInteger) args[0];
int maxRetries = (int) args[1];
if (runTimes.incrementAndGet() <= maxRetries) {
throw new RuntimeException();
}
- return 0;
+ return completedFuture(0);
}
}
@@ -212,9 +214,9 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
private static class JobSuccess implements ComputeJob<Integer> {
@Override
- public Integer execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
AtomicInteger runTimes = (AtomicInteger) args[0];
- return runTimes.incrementAndGet();
+ return completedFuture(runTimes.incrementAndGet());
}
}
@@ -235,8 +237,8 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
private static class SimpleJob implements ComputeJob<Integer> {
@Override
- public Integer execute(JobExecutionContext context, Object... args) {
- return 0;
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
+ return completedFuture(0);
}
}
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java
index 4887d275ce..d6eeaa81c6 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.compute.loader;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.getPath;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -29,6 +31,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.DeploymentUnit;
@@ -64,14 +67,14 @@ class JobClassLoaderFactoryTest extends
BaseIgniteAbstractTest {
// then classes from the first unit are loaded from the first
class loader
Class<?> clazz1 = classLoader1.loadClass(UNIT_JOB_CLASS_NAME);
ComputeJob<Integer> job1 = (ComputeJob<Integer>)
clazz1.getDeclaredConstructor().newInstance();
- Integer result1 = job1.execute(null);
- assertEquals(1, result1);
+ CompletableFuture<Integer> result1 = job1.executeAsync(null);
+ assertThat(result1, willBe(1));
// and classes from the second unit are loaded from the second
class loader
Class<?> clazz2 = classLoader2.loadClass(UNIT_JOB_CLASS_NAME);
ComputeJob<String> job2 = (ComputeJob<String>)
clazz2.getDeclaredConstructor().newInstance();
- String result2 = job2.execute(null);
- assertEquals("Hello World!", result2);
+ CompletableFuture<String> result2 = job2.executeAsync(null);
+ assertThat(result2, willBe("Hello World!"));
}
}
@@ -90,8 +93,8 @@ class JobClassLoaderFactoryTest extends
BaseIgniteAbstractTest {
// and classes are loaded in the aplhabetical order
ComputeJob<Integer> job1 = (ComputeJob<Integer>)
unitJobClass.getDeclaredConstructor().newInstance();
- Integer result1 = job1.execute(null);
- assertEquals(1, result1);
+ CompletableFuture<Integer> result1 = job1.executeAsync(null);
+ assertThat(result1, willBe(1));
Class<?> job1UtilityClass =
classLoader.loadClass(JOB1_UTILITY_CLASS_NAME);
assertNotNull(job1UtilityClass);
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index adc87ffdc2..eb68ff790d 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.compute.queue;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.compute.JobState.CANCELED;
import static org.apache.ignite.compute.JobState.COMPLETED;
import static org.apache.ignite.compute.JobState.EXECUTING;
@@ -231,7 +232,7 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
try {
latch.await();
} catch (InterruptedException e) {
- return 0;
+ return completedFuture(0);
}
}
});
@@ -254,7 +255,7 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
CountDownLatch latch = new CountDownLatch(1);
QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> {
latch.await();
- return 0;
+ return completedFuture(0);
});
JobStatus executingStatus = await().until(execution::status,
jobStatusWithState(EXECUTING));
@@ -272,7 +273,7 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
void completedTaskCancel() {
initExecutor(1);
- QueueExecution<Object> execution = priorityQueueExecutor.submit(() ->
0);
+ QueueExecution<Object> execution = priorityQueueExecutor.submit(() ->
completedFuture(0));
await().until(execution::status, jobStatusWithState(COMPLETED));
@@ -290,13 +291,13 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
CountDownLatch latch = new CountDownLatch(1);
QueueExecution<Object> runningExecution =
priorityQueueExecutor.submit(() -> {
latch.await();
- return 0;
+ return completedFuture(0);
});
await().until(runningExecution::status, jobStatusWithState(EXECUTING));
// Put the task in the queue
- QueueExecution<Object> execution = priorityQueueExecutor.submit(() ->
0);
+ QueueExecution<Object> execution = priorityQueueExecutor.submit(() ->
completedFuture(0));
await().until(execution::status, jobStatusWithState(QUEUED));
// Cancel the task
@@ -340,7 +341,7 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
if (runTimes.incrementAndGet() <= maxRetries) {
throw new RuntimeException();
}
- return 0;
+ return completedFuture(0);
}, 0, maxRetries);
await().until(execution::status, jobStatusWithState(COMPLETED));
@@ -385,7 +386,7 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
QueueExecution<Integer> runningExecution3 =
priorityQueueExecutor.submit(() -> {
latch3.await();
- return 2;
+ return completedFuture(2);
}, 1, 0);
CompletableFuture<Integer> task3 = runningExecution3.resultAsync();
@@ -436,7 +437,7 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
QueueExecution<Integer> runningExecution =
priorityQueueExecutor.submit(() -> {
latch3.await();
- return 2;
+ return completedFuture(2);
}, 1, 0);
CompletableFuture<Integer> task3 = runningExecution.resultAsync();
@@ -491,7 +492,7 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
// Start two tasks
QueueExecution<Integer> runningExecution1 =
priorityQueueExecutor.submit(() -> {
latch1.await();
- return 2;
+ return completedFuture(2);
}, 1, 0);
CompletableFuture<Integer> task1 = runningExecution1.resultAsync();
@@ -528,7 +529,7 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
// Start three tasks
QueueExecution<Integer> runningExecution =
priorityQueueExecutor.submit(() -> {
latch1.await();
- return 2;
+ return completedFuture(2);
}, 1, 0);
CompletableFuture<Integer> task1 = runningExecution.resultAsync();
@@ -582,6 +583,6 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
}
private <R> CompletableFuture<R> submit(Callable<R> job, int priority, int
maxRetries) {
- return priorityQueueExecutor.submit(job, priority,
maxRetries).resultAsync();
+ return priorityQueueExecutor.submit(() -> completedFuture(job.call()),
priority, maxRetries).resultAsync();
}
}
diff --git
a/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java
b/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java
index d1161122de..2b5fe0d9ca 100644
---
a/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java
+++
b/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java
@@ -17,13 +17,16 @@
package org.apache.ignite.internal.compute;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
/** Compute job. */
public class UnitJob implements ComputeJob<Integer> {
@Override
- public Integer execute(JobExecutionContext context, Object... args) {
- return 1;
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
+ return completedFuture(1);
}
}
diff --git
a/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java
b/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java
index 51163e21ae..631e225821 100644
---
a/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java
+++
b/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java
@@ -17,13 +17,16 @@
package org.apache.ignite.internal.compute;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
/** Compute job. */
public class UnitJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
- return "Hello World!";
+ public CompletableFuture<String> executeAsync(JobExecutionContext context,
Object... args) {
+ return completedFuture("Hello World!");
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index a3ee3a9c08..812e760dd1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -416,7 +416,7 @@ namespace Apache.Ignite.Tests.Compute
var str = ex.ToString();
StringAssert.Contains(
- "at
org.apache.ignite.internal.runner.app.PlatformTestNodeRunner$ExceptionJob.execute(PlatformTestNodeRunner.java:",
+ "at
org.apache.ignite.internal.runner.app.PlatformTestNodeRunner$ExceptionJob.executeAsync(PlatformTestNodeRunner.java:",
str);
}
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
index 79b3435e9e..767fbc4a15 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobDescriptor;
@@ -396,7 +397,7 @@ public class ItComputeControllerTest extends
ClusterPerClassIntegrationTest {
private static class BlockingJob implements ComputeJob<String> {
/** {@inheritDoc} */
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
synchronized (LOCK) {
try {
LOCK.wait();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 2539d24935..e8c6f98390 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.runner.app;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.MAX_TIME_PRECISION;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
@@ -550,12 +551,12 @@ public class PlatformTestNodeRunner {
@SuppressWarnings("unused") // Used by platform tests.
private static class CreateTableJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
String tableName = (String) args[0];
context.ignite().sql().execute(null, "CREATE TABLE " + tableName +
"(key BIGINT PRIMARY KEY, val INT)");
- return tableName;
+ return completedFuture(tableName);
}
}
@@ -565,11 +566,11 @@ public class PlatformTestNodeRunner {
@SuppressWarnings("unused") // Used by platform tests.
private static class DropTableJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
String tableName = (String) args[0];
context.ignite().sql().execute(null, "DROP TABLE " + tableName +
"");
- return tableName;
+ return completedFuture(tableName);
}
}
@@ -579,7 +580,7 @@ public class PlatformTestNodeRunner {
@SuppressWarnings("unused") // Used by platform tests.
private static class ExceptionJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
throw new RuntimeException("Test exception: " + args[0]);
}
}
@@ -590,7 +591,7 @@ public class PlatformTestNodeRunner {
@SuppressWarnings("unused") // Used by platform tests.
private static class CheckedExceptionJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
throw new CompletionException(new
IgniteCheckedException(Common.NODE_LEFT_ERR, "TestCheckedEx: " + args[0]));
}
}
@@ -601,7 +602,7 @@ public class PlatformTestNodeRunner {
@SuppressWarnings("unused") // Used by platform tests.
private static class ColocationHashJob implements ComputeJob<Integer> {
@Override
- public Integer execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
var columnCount = (int) args[0];
var buf = (byte[]) args[1];
var timePrecision = (int) args[2];
@@ -712,7 +713,7 @@ public class PlatformTestNodeRunner {
try {
Row row = marsh.marshal(tuple);
- return row.colocationHash();
+ return completedFuture(row.colocationHash());
} catch (TupleMarshallerException e) {
throw new RuntimeException(e);
}
@@ -725,7 +726,7 @@ public class PlatformTestNodeRunner {
@SuppressWarnings("unused") // Used by platform tests.
private static class TableRowColocationHashJob implements
ComputeJob<Integer> {
@Override
- public Integer execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
String tableName = (String) args[0];
int i = (int) args[1];
Tuple key = Tuple.create().set("id", 1 + i).set("id0", 2L +
i).set("id1", "3" + i);
@@ -736,7 +737,7 @@ public class PlatformTestNodeRunner {
TupleMarshaller marsh = view.marshaller(1);
try {
- return marsh.marshal(key).colocationHash();
+ return completedFuture(marsh.marshal(key).colocationHash());
} catch (TupleMarshallerException e) {
throw new RuntimeException(e);
}
@@ -749,7 +750,7 @@ public class PlatformTestNodeRunner {
@SuppressWarnings("unused") // Used by platform tests.
private static class EnableAuthenticationJob implements ComputeJob<Void> {
@Override
- public Void execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<Void> executeAsync(JobExecutionContext
context, Object... args) {
boolean enable = ((Integer) args[0]) != 0;
@SuppressWarnings("resource") IgniteImpl ignite = (IgniteImpl)
context.ignite();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index f6d310ae9a..4bf18bc6df 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.runner.app.client;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.compute.JobState.CANCELED;
import static org.apache.ignite.compute.JobState.COMPLETED;
import static org.apache.ignite.compute.JobState.EXECUTING;
@@ -27,6 +29,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR;
@@ -86,6 +89,7 @@ import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
@@ -157,10 +161,13 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
assertThat(execution.changePriorityAsync(0), willBe(false));
}
- @Test
- void testCancelOnSpecificNodeAsync() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCancelOnSpecificNodeAsync(boolean asyncJob) {
int sleepMs = 1_000_000;
- JobDescriptor sleepJob = JobDescriptor.builder(SleepJob.class).build();
+ JobDescriptor sleepJob = JobDescriptor
+ .builder(asyncJob ? AsyncSleepJob.class : SleepJob.class)
+ .build();
JobExecution<String> execution1 =
client().compute().submit(Set.of(node(0)), sleepJob, sleepMs);
JobExecution<String> execution2 =
client().compute().submit(Set.of(node(1)), sleepJob, sleepMs);
@@ -326,19 +333,21 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
assertNull(cause.getCause()); // No stack trace by default.
}
- @Test
- void testExceptionInJobPropagatesToClientWithClassAndMessageAsync() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testExceptionInJobPropagatesToClientWithClassAndMessageAsync(boolean
asyncJob) {
IgniteException cause = getExceptionInJobExecutionAsync(
- client().compute().submit(Set.of(node(0)),
JobDescriptor.builder(ExceptionJob.class).build())
+ client().compute().submit(Set.of(node(0)),
JobDescriptor.builder(ExceptionJob.class).build(), asyncJob)
);
assertComputeExceptionWithClassAndMessage(cause);
}
- @Test
- void testExceptionInJobPropagatesToClientWithClassAndMessageSync() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testExceptionInJobPropagatesToClientWithClassAndMessageSync(boolean
asyncJob) {
IgniteException cause = getExceptionInJobExecutionSync(
- () -> client().compute().execute(Set.of(node(0)),
JobDescriptor.builder(ExceptionJob.class).build())
+ () -> client().compute().execute(Set.of(node(0)),
JobDescriptor.builder(ExceptionJob.class).build(), asyncJob)
);
assertComputeExceptionWithClassAndMessage(cause);
@@ -509,7 +518,7 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
assertThat(cause.getCause().getMessage(), containsString(
"Caused by: java.lang.ArithmeticException: math err" +
System.lineSeparator()
+ "\tat
org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$"
- + "ExceptionJob.execute(ItThinClientComputeTest.java:")
+ +
"ExceptionJob.executeAsync(ItThinClientComputeTest.java:")
);
}
@@ -737,39 +746,47 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
private static class NodeNameJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
- return context.ignite().name() +
Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_"));
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
+ return completedFuture(
+ context.ignite().name() +
Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_")));
}
}
private static class ConcatJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
if (args == null) {
- return null;
+ return nullCompletedFuture();
}
- return Arrays.stream(args).map(o -> o == null ? "null" :
o.toString()).collect(Collectors.joining("_"));
+ return completedFuture(
+ Arrays.stream(args).map(o -> o == null ? "null" :
o.toString()).collect(Collectors.joining("_")));
}
}
private static class IgniteExceptionJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
throw new CustomException(TRACE_ID, COLUMN_ALREADY_EXISTS_ERR,
"Custom job error", null);
}
}
private static class ExceptionJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
- throw new ArithmeticException("math err");
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
+ boolean asyncJob = args.length > 0 && (Boolean) args[0];
+
+ if (asyncJob) {
+ return failedFuture(new ArithmeticException("math err"));
+ } else {
+ throw new ArithmeticException("math err");
+ }
}
}
private static class EchoJob implements ComputeJob<Object> {
@Override
- public Object execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<Object> executeAsync(JobExecutionContext
context, Object... args) {
var value = args[0];
if (!(value instanceof byte[])) {
@@ -778,13 +795,13 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
assertEquals(expectedString, valueString, "Unexpected string
representation of value");
}
- return args[0];
+ return completedFuture(args[0]);
}
}
private static class SleepJob implements ComputeJob<Void> {
@Override
- public Void execute(JobExecutionContext context, Object... args) {
+ public @Nullable CompletableFuture<Void>
executeAsync(JobExecutionContext context, Object... args) {
try {
Thread.sleep((Integer) args[0]);
} catch (InterruptedException e) {
@@ -795,10 +812,23 @@ public class ItThinClientComputeTest extends
ItAbstractThinClientTest {
}
}
+ private static class AsyncSleepJob implements ComputeJob<Void> {
+ @Override
+ public @Nullable CompletableFuture<Void>
executeAsync(JobExecutionContext context, Object... args) {
+ return CompletableFuture.runAsync(() -> {
+ try {
+ Thread.sleep((Integer) args[0]);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
private static class DecimalJob implements ComputeJob<BigDecimal> {
@Override
- public BigDecimal execute(JobExecutionContext context, Object... args)
{
- return new BigDecimal((String) args[0]).setScale((Integer)
args[1], RoundingMode.HALF_UP);
+ public CompletableFuture<BigDecimal> executeAsync(JobExecutionContext
context, Object... args) {
+ return completedFuture(new BigDecimal((String)
args[0]).setScale((Integer) args[1], RoundingMode.HALF_UP));
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java
index 8d7c85b5d3..ded59c2b37 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.runner.app.client;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.compute.ComputeJob;
@@ -121,9 +123,10 @@ public class ItThinClientPartitionAwarenessTest extends
ItAbstractThinClientTest
private static class NodeNameJob implements ComputeJob<String> {
@Override
- public String execute(JobExecutionContext context, Object... args) {
+ public CompletableFuture<String> executeAsync(JobExecutionContext
context, Object... args) {
//noinspection resource
- return context.ignite().name() +
Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_"));
+ return completedFuture(
+ context.ignite().name() +
Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_")));
}
}
}