This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8df4e54430 IGNITE-23014 IgniteCompute transparency with respect to
node restart (#4357)
8df4e54430 is described below
commit 8df4e54430e3b6370fac0cb1fa96eb8d6792fc86
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Sep 9 18:43:29 2024 +0400
IGNITE-23014 IgniteCompute transparency with respect to node restart (#4357)
---
.../threading/ItComputeApiThreadingTest.java | 97 +++++++++++++++++++++-
.../ignite/internal/app/AsyncApiOperation.java | 16 +++-
.../app/ItShutDownServerApiReferencesTest.java | 2 +-
.../org/apache/ignite/internal/app/NoOpJob.java | 31 +++++++
.../ignite/internal/app/NoOpMapReduceTask.java | 47 +++++++++++
.../org/apache/ignite/internal/app/References.java | 9 ++
.../ignite/internal/app/SyncApiOperation.java | 30 ++++++-
.../internal/restart/RestartProofIgnite.java | 5 +-
.../restart/RestartProofIgniteCompute.java | 93 +++++++++++++++++++++
9 files changed, 323 insertions(+), 7 deletions(-)
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
index cc6149a7b5..8edd9d955a 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
@@ -27,7 +27,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.is;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.compute.ComputeJob;
@@ -36,6 +39,11 @@ import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecutionContext;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.compute.IgniteComputeImpl;
import org.apache.ignite.internal.wrapper.Wrappers;
@@ -44,6 +52,7 @@ import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junitpioneer.jupiter.cartesian.CartesianTest;
@@ -141,6 +150,36 @@ class ItComputeApiThreadingTest extends
ClusterPerClassIntegrationTest {
return Set.of(unwrapIgniteImpl(CLUSTER.node(1)).node());
}
+ @CartesianTest
+ void taskExecutionFuturesCompleteInContinuationsPool(
+ @Enum ComputeMapReduceOperation mapReduceOperation,
+ @Enum TaskExecutionAsyncOperation executionOperation
+ ) {
+ TaskExecution<?> execution =
mapReduceOperation.executeOn(computeForPublicUse());
+
+ CompletableFuture<Thread> completerFuture =
executionOperation.executeOn(execution)
+ .thenApply(unused -> currentThread());
+
+ assertThat(completerFuture, willBe(
+ either(is(currentThread())).or(asyncContinuationPool())
+ ));
+ }
+
+ @CartesianTest
+ void
taskExecutionFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
+ @Enum ComputeMapReduceOperation submitOperation,
+ @Enum TaskExecutionAsyncOperation executionOperation
+ ) {
+ TaskExecution<?> execution =
submitOperation.executeOn(computeForInternalUse());
+
+ CompletableFuture<Thread> completerFuture =
executionOperation.executeOn(execution)
+ .thenApply(unused -> currentThread());
+
+ assertThat(completerFuture, willBe(
+ either(is(currentThread())).or(anIgniteThread())
+ ));
+ }
+
private static class NoOpJob implements ComputeJob<Void, String> {
@Override
public CompletableFuture<String> executeAsync(JobExecutionContext
context, Void input) {
@@ -148,6 +187,23 @@ class ItComputeApiThreadingTest extends
ClusterPerClassIntegrationTest {
}
}
+ private static class NoOpMapReduceTask implements MapReduceTask<Void,
Void, String, Void> {
+ @Override
+ public CompletableFuture<List<MapReduceJob<Void, String>>>
splitAsync(TaskExecutionContext taskContext, @Nullable Void input) {
+ return completedFuture(List.of(
+ MapReduceJob.<Void, String>builder()
+
.jobDescriptor(JobDescriptor.builder(NoOpJob.class).build())
+ .nodes(taskContext.ignite().clusterNodes())
+ .build()
+ ));
+ }
+
+ @Override
+ public CompletableFuture<Void> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, String> results) {
+ return completedFuture(null);
+ }
+ }
+
private enum ComputeAsyncOperation {
EXECUTE_ASYNC(compute -> compute.executeAsync(
JobTarget.anyNode(justNonEntryNode()),
@@ -164,7 +220,10 @@ class ItComputeApiThreadingTest extends
ClusterPerClassIntegrationTest {
null)),
EXECUTE_BROADCAST_ASYNC(compute ->
compute.executeBroadcastAsync(justNonEntryNode(),
JobDescriptor.builder(NoOpJob.class).build(),
- null));
+ null)),
+ EXECUTE_MAP_REDUCE_ASYNC(compute -> compute
+
.executeMapReduceAsync(TaskDescriptor.builder(NoOpMapReduceTask.class).build(),
null)
+ );
private final Function<IgniteCompute, CompletableFuture<?>> action;
@@ -222,4 +281,40 @@ class ItComputeApiThreadingTest extends
ClusterPerClassIntegrationTest {
return action.apply((JobExecution<Object>) execution);
}
}
+
+ private enum ComputeMapReduceOperation {
+ SUBMIT_MAP_REDUCE(compute -> compute
+
.submitMapReduce(TaskDescriptor.builder(NoOpMapReduceTask.class).build(), null)
+ );
+
+ private final Function<IgniteCompute, TaskExecution<?>> action;
+
+ ComputeMapReduceOperation(Function<IgniteCompute, TaskExecution<?>>
action) {
+ this.action = action;
+ }
+
+ TaskExecution<?> executeOn(IgniteCompute compute) {
+ return action.apply(compute);
+ }
+ }
+
+ private enum TaskExecutionAsyncOperation {
+ STATES_ASYNC(execution -> execution.statesAsync()),
+ IDS_ASYNC(execution -> execution.idsAsync()),
+ RESULT_ASYNC(execution -> execution.resultAsync()),
+ STATE_ASYNC(execution -> execution.stateAsync()),
+ ID_ASYNC(execution -> execution.idAsync()),
+ CANCEL_ASYNC(execution -> execution.cancelAsync()),
+ CHANGE_PRIORITY_ASYNC(execution -> execution.changePriorityAsync(1));
+
+ private final Function<TaskExecution<Object>, CompletableFuture<?>>
action;
+
+ TaskExecutionAsyncOperation(Function<TaskExecution<Object>,
CompletableFuture<?>> action) {
+ this.action = action;
+ }
+
+ CompletableFuture<?> executeOn(TaskExecution<?> execution) {
+ return action.apply((TaskExecution<Object>) execution);
+ }
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
index f94df5e7ce..128b964829 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.app;
+import static org.apache.ignite.compute.JobTarget.anyNode;
import static org.apache.ignite.internal.app.ApiReferencesTestUtils.FULL_TUPLE;
import static org.apache.ignite.internal.app.ApiReferencesTestUtils.KEY_TUPLE;
import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_QUERY;
@@ -28,8 +29,11 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.internal.streamer.SimplePublisher;
import org.apache.ignite.internal.table.partition.HashPartition;
import org.apache.ignite.sql.BatchedArguments;
@@ -128,7 +132,17 @@ enum AsyncApiOperation {
// SQL_EXECUTE_STATEMENT_WITH_MAPPER(refs -> refs.sql.executeAsync(null,
Mapper.of(Integer.class), refs.selectIdsStatement)),
SQL_EXECUTE_BATCH(refs -> refs.sql.executeBatchAsync(null, UPDATE_QUERY,
BatchedArguments.of(999))),
SQL_EXECUTE_BATCH_STATEMENT(refs -> refs.sql.executeBatchAsync(null,
refs.updateStatement, BatchedArguments.of(999))),
- SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScriptAsync(SELECT_IDS_QUERY));
+ SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScriptAsync(SELECT_IDS_QUERY)),
+
+ COMPUTE_EXECUTE(refs -> refs.compute.executeAsync(
+ anyNode(refs.clusterNodes),
JobDescriptor.builder(NoOpJob.class).build(), null
+ )),
+ COMPUTE_EXECUTE_BROADCAST(refs -> refs.compute.executeBroadcastAsync(
+ Set.copyOf(refs.clusterNodes),
+ JobDescriptor.builder(NoOpJob.class).build(),
+ null
+ )),
+ COMPUTE_EXECUTE_MAP_REDUCE(refs ->
refs.compute.executeMapReduceAsync(TaskDescriptor.builder(NoOpMapReduceTask.class).build(),
null));
private final Function<References, CompletableFuture<?>> action;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
index 3355a0b567..84da36a4b0 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
@@ -68,7 +68,7 @@ class ItShutDownServerApiReferencesTest extends
ClusterPerClassIntegrationTest {
@ParameterizedTest
@EnumSource(AsyncApiOperation.class)
- void asyncOperationsWorkAfterRestart(AsyncApiOperation operation) {
+ void asyncOperationsThrowAfterShutdown(AsyncApiOperation operation) {
assertThat(operation.execute(beforeShutdown),
willThrow(IgniteException.class, 10, SECONDS, "The node is already shut
down."));
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpJob.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpJob.java
new file mode 100644
index 0000000000..c0cef833fd
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpJob.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.app;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+class NoOpJob implements ComputeJob<Void, String> {
+ @Override
+ public CompletableFuture<String> executeAsync(JobExecutionContext context,
Void input) {
+ return completedFuture("ok");
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpMapReduceTask.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpMapReduceTask.java
new file mode 100644
index 0000000000..4babb32252
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpMapReduceTask.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.app;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.jetbrains.annotations.Nullable;
+
+class NoOpMapReduceTask implements MapReduceTask<Void, Void, String, Void> {
+ @Override
+ public CompletableFuture<List<MapReduceJob<Void, String>>>
splitAsync(TaskExecutionContext taskContext, @Nullable Void input) {
+ return completedFuture(List.of(
+ MapReduceJob.<Void, String>builder()
+
.jobDescriptor(JobDescriptor.builder(NoOpJob.class).build())
+ .nodes(taskContext.ignite().clusterNodes())
+ .build()
+ ));
+ }
+
+ @Override
+ public CompletableFuture<Void> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, String> results) {
+ return completedFuture(null);
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
index 16efeddb7d..8535f983c8 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
@@ -22,8 +22,11 @@ import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_Q
import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.TEST_TABLE_NAME;
import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.UPDATE_QUERY;
+import java.util.Collection;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteServer;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.table.IgniteTables;
@@ -44,6 +47,9 @@ class References {
final IgniteTables tables;
final IgniteTransactions transactions;
final IgniteSql sql;
+ final IgniteCompute compute;
+
+ final Collection<ClusterNode> clusterNodes;
final Table table; // From table().
final Table tableFromTableAsync;
@@ -69,6 +75,9 @@ class References {
tables = ignite.tables();
transactions = ignite.transactions();
sql = ignite.sql();
+ compute = ignite.compute();
+
+ clusterNodes = ignite.clusterNodes();
table = tables.table(TEST_TABLE_NAME);
tableFromTableAsync = tables.tableAsync(TEST_TABLE_NAME).get(10,
SECONDS);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
index 8f6ff55d20..d187c139e5 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.app;
+import static org.apache.ignite.compute.JobTarget.anyNode;
import static org.apache.ignite.internal.app.ApiReferencesTestUtils.FULL_TUPLE;
import static org.apache.ignite.internal.app.ApiReferencesTestUtils.KEY_TUPLE;
import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_QUERY;
@@ -26,7 +27,10 @@ import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.VALUE_TUPLE;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Consumer;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.TaskDescriptor;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.table.mapper.Mapper;
@@ -124,7 +128,28 @@ enum SyncApiOperation {
// SQL_EXECUTE_STATEMENT_WITH_MAPPER(refs -> refs.sql.execute(null,
Mapper.of(Integer.class), refs.selectIdsStatement)),
SQL_EXECUTE_BATCH(refs -> refs.sql.executeBatch(null, UPDATE_QUERY,
BatchedArguments.of(999))),
SQL_EXECUTE_BATCH_STATEMENT(refs -> refs.sql.executeBatch(null,
refs.updateStatement, BatchedArguments.of(999))),
- SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScript(SELECT_IDS_QUERY));
+ SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScript(SELECT_IDS_QUERY)),
+
+ COMPUTE_SUBMIT(refs -> refs.compute.submit(anyNode(refs.clusterNodes),
JobDescriptor.builder(NoOpJob.class).build(), null)),
+ COMPUTE_EXECUTE(refs -> refs.compute.execute(anyNode(refs.clusterNodes),
JobDescriptor.builder(NoOpJob.class).build(), null)),
+ COMPUTE_SUBMIT_BROADCAST(refs -> refs.compute.submitBroadcast(
+ Set.copyOf(refs.clusterNodes),
+ JobDescriptor.builder(NoOpJob.class).build(),
+ null
+ )),
+ COMPUTE_EXECUTE_BROADCAST(refs -> refs.compute.executeBroadcast(
+ Set.copyOf(refs.clusterNodes),
+ JobDescriptor.builder(NoOpJob.class).build(),
+ null
+ )),
+ COMPUTE_SUBMIT_MAP_REDUCE(refs -> refs.compute.submitMapReduce(
+ TaskDescriptor.builder(NoOpMapReduceTask.class).build(),
+ null
+ )),
+ COMPUTE_EXECUTE_MAP_REDUCE(refs -> refs.compute.executeMapReduce(
+ TaskDescriptor.builder(NoOpMapReduceTask.class).build(),
+ null
+ ));
private final Consumer<References> action;
@@ -139,6 +164,7 @@ enum SyncApiOperation {
boolean worksAfterShutdown() {
return this == IGNITE_TABLES
|| this == IGNITE_TRANSACTIONS
- || this == IGNITE_SQL;
+ || this == IGNITE_SQL
+ || this == IGNITE_COMPUTE;
}
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
index d86617527e..07ebd6bd07 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
@@ -40,6 +40,7 @@ public class RestartProofIgnite implements Ignite, Wrapper {
private final IgniteTables tables;
private final IgniteTransactions transactions;
private final IgniteSql sql;
+ private final IgniteCompute compute;
/**
* Constructor.
@@ -50,6 +51,7 @@ public class RestartProofIgnite implements Ignite, Wrapper {
tables = new RestartProofIgniteTables(attachmentLock);
transactions = new RestartProofIgniteTransactions(attachmentLock);
sql = new RestartProofIgniteSql(attachmentLock);
+ compute = new RestartProofIgniteCompute(attachmentLock);
}
@Override
@@ -74,8 +76,7 @@ public class RestartProofIgnite implements Ignite, Wrapper {
@Override
public IgniteCompute compute() {
- // TODO: IGNITE-23014 - add a wrapper.
- return attachmentLock.attached(Ignite::compute);
+ return compute;
}
@Override
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteCompute.java
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteCompute.java
new file mode 100644
index 0000000000..c0b115e1c6
--- /dev/null
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteCompute.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.restart;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.TaskExecution;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reference to {@link IgniteCompute} under a swappable {@link Ignite}
instance. When a restart happens, this switches to
+ * the new Ignite instance.
+ *
+ * <p>API operations on this are linearized with respect to node restarts.
Normally (except for situations when timeouts trigger), user
+ * operations will not interact with detached objects.
+ */
+// TODO; IGNITE-23166 - make returned executions restart-proof.
+class RestartProofIgniteCompute implements IgniteCompute, Wrapper {
+ private final IgniteAttachmentLock attachmentLock;
+
+ RestartProofIgniteCompute(IgniteAttachmentLock attachmentLock) {
+ this.attachmentLock = attachmentLock;
+ }
+
+ @Override
+ public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R>
descriptor, @Nullable T arg) {
+ return attachmentLock.attached(ignite ->
ignite.compute().submit(target, descriptor, arg));
+ }
+
+ @Override
+ public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor,
@Nullable T arg) {
+ return attachmentLock.attached(ignite ->
ignite.compute().execute(target, descriptor, arg));
+ }
+
+ @Override
+ public <T, R> CompletableFuture<R> executeAsync(JobTarget target,
JobDescriptor<T, R> descriptor, @Nullable T arg) {
+ return attachmentLock.attachedAsync(ignite ->
ignite.compute().executeAsync(target, descriptor, arg));
+ }
+
+ @Override
+ public <T, R> Map<ClusterNode, JobExecution<R>> submitBroadcast(
+ Set<ClusterNode> nodes,
+ JobDescriptor<T, R> descriptor,
+ @Nullable T arg
+ ) {
+ return attachmentLock.attached(ignite ->
ignite.compute().submitBroadcast(nodes, descriptor, arg));
+ }
+
+ @Override
+ public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R>
taskDescriptor, @Nullable T arg) {
+ return attachmentLock.attached(ignite ->
ignite.compute().submitMapReduce(taskDescriptor, arg));
+ }
+
+ @Override
+ public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor,
@Nullable T arg) {
+ return attachmentLock.attached(ignite ->
ignite.compute().executeMapReduce(taskDescriptor, arg));
+ }
+
+ @Override
+ public <T, R> CompletableFuture<R> executeMapReduceAsync(TaskDescriptor<T,
R> taskDescriptor, @Nullable T arg) {
+ return attachmentLock.attachedAsync(ignite ->
ignite.compute().executeMapReduceAsync(taskDescriptor, arg));
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> classToUnwrap) {
+ return attachmentLock.attached(ignite ->
Wrappers.unwrap(ignite.compute(), classToUnwrap));
+ }
+}