This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8df4e54430 IGNITE-23014 IgniteCompute transparency with respect to 
node restart (#4357)
8df4e54430 is described below

commit 8df4e54430e3b6370fac0cb1fa96eb8d6792fc86
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Sep 9 18:43:29 2024 +0400

    IGNITE-23014 IgniteCompute transparency with respect to node restart (#4357)
---
 .../threading/ItComputeApiThreadingTest.java       | 97 +++++++++++++++++++++-
 .../ignite/internal/app/AsyncApiOperation.java     | 16 +++-
 .../app/ItShutDownServerApiReferencesTest.java     |  2 +-
 .../org/apache/ignite/internal/app/NoOpJob.java    | 31 +++++++
 .../ignite/internal/app/NoOpMapReduceTask.java     | 47 +++++++++++
 .../org/apache/ignite/internal/app/References.java |  9 ++
 .../ignite/internal/app/SyncApiOperation.java      | 30 ++++++-
 .../internal/restart/RestartProofIgnite.java       |  5 +-
 .../restart/RestartProofIgniteCompute.java         | 93 +++++++++++++++++++++
 9 files changed, 323 insertions(+), 7 deletions(-)

diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
index cc6149a7b5..8edd9d955a 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
@@ -27,7 +27,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.is;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import org.apache.ignite.compute.ComputeJob;
@@ -36,6 +39,11 @@ import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecution;
+import org.apache.ignite.compute.task.TaskExecutionContext;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.compute.IgniteComputeImpl;
 import org.apache.ignite.internal.wrapper.Wrappers;
@@ -44,6 +52,7 @@ import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junitpioneer.jupiter.cartesian.CartesianTest;
@@ -141,6 +150,36 @@ class ItComputeApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         return Set.of(unwrapIgniteImpl(CLUSTER.node(1)).node());
     }
 
+    @CartesianTest
+    void taskExecutionFuturesCompleteInContinuationsPool(
+            @Enum ComputeMapReduceOperation mapReduceOperation,
+            @Enum TaskExecutionAsyncOperation executionOperation
+    ) {
+        TaskExecution<?> execution = 
mapReduceOperation.executeOn(computeForPublicUse());
+
+        CompletableFuture<Thread> completerFuture = 
executionOperation.executeOn(execution)
+                .thenApply(unused -> currentThread());
+
+        assertThat(completerFuture, willBe(
+                either(is(currentThread())).or(asyncContinuationPool())
+        ));
+    }
+
+    @CartesianTest
+    void 
taskExecutionFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
+            @Enum ComputeMapReduceOperation submitOperation,
+            @Enum TaskExecutionAsyncOperation executionOperation
+    ) {
+        TaskExecution<?> execution = 
submitOperation.executeOn(computeForInternalUse());
+
+        CompletableFuture<Thread> completerFuture = 
executionOperation.executeOn(execution)
+                .thenApply(unused -> currentThread());
+
+        assertThat(completerFuture, willBe(
+                either(is(currentThread())).or(anIgniteThread())
+        ));
+    }
+
     private static class NoOpJob implements ComputeJob<Void, String> {
         @Override
         public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Void input) {
@@ -148,6 +187,23 @@ class ItComputeApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         }
     }
 
+    private static class NoOpMapReduceTask implements MapReduceTask<Void, 
Void, String, Void> {
+        @Override
+        public CompletableFuture<List<MapReduceJob<Void, String>>> 
splitAsync(TaskExecutionContext taskContext, @Nullable Void input) {
+            return completedFuture(List.of(
+                    MapReduceJob.<Void, String>builder()
+                            
.jobDescriptor(JobDescriptor.builder(NoOpJob.class).build())
+                            .nodes(taskContext.ignite().clusterNodes())
+                            .build()
+            ));
+        }
+
+        @Override
+        public CompletableFuture<Void> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, String> results) {
+            return completedFuture(null);
+        }
+    }
+
     private enum ComputeAsyncOperation {
         EXECUTE_ASYNC(compute -> compute.executeAsync(
                 JobTarget.anyNode(justNonEntryNode()),
@@ -164,7 +220,10 @@ class ItComputeApiThreadingTest extends 
ClusterPerClassIntegrationTest {
 
                         null)),
         EXECUTE_BROADCAST_ASYNC(compute -> 
compute.executeBroadcastAsync(justNonEntryNode(), 
JobDescriptor.builder(NoOpJob.class).build(),
-                null));
+                null)),
+        EXECUTE_MAP_REDUCE_ASYNC(compute -> compute
+                
.executeMapReduceAsync(TaskDescriptor.builder(NoOpMapReduceTask.class).build(), 
null)
+        );
 
         private final Function<IgniteCompute, CompletableFuture<?>> action;
 
@@ -222,4 +281,40 @@ class ItComputeApiThreadingTest extends 
ClusterPerClassIntegrationTest {
             return action.apply((JobExecution<Object>) execution);
         }
     }
+
+    private enum ComputeMapReduceOperation {
+        SUBMIT_MAP_REDUCE(compute -> compute
+                
.submitMapReduce(TaskDescriptor.builder(NoOpMapReduceTask.class).build(), null)
+        );
+
+        private final Function<IgniteCompute, TaskExecution<?>> action;
+
+        ComputeMapReduceOperation(Function<IgniteCompute, TaskExecution<?>> 
action) {
+            this.action = action;
+        }
+
+        TaskExecution<?> executeOn(IgniteCompute compute) {
+            return action.apply(compute);
+        }
+    }
+
+    private enum TaskExecutionAsyncOperation {
+        STATES_ASYNC(execution -> execution.statesAsync()),
+        IDS_ASYNC(execution -> execution.idsAsync()),
+        RESULT_ASYNC(execution -> execution.resultAsync()),
+        STATE_ASYNC(execution -> execution.stateAsync()),
+        ID_ASYNC(execution -> execution.idAsync()),
+        CANCEL_ASYNC(execution -> execution.cancelAsync()),
+        CHANGE_PRIORITY_ASYNC(execution -> execution.changePriorityAsync(1));
+
+        private final Function<TaskExecution<Object>, CompletableFuture<?>> 
action;
+
+        TaskExecutionAsyncOperation(Function<TaskExecution<Object>, 
CompletableFuture<?>> action) {
+            this.action = action;
+        }
+
+        CompletableFuture<?> executeOn(TaskExecution<?> execution) {
+            return action.apply((TaskExecution<Object>) execution);
+        }
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
index f94df5e7ce..128b964829 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.app;
 
+import static org.apache.ignite.compute.JobTarget.anyNode;
 import static org.apache.ignite.internal.app.ApiReferencesTestUtils.FULL_TUPLE;
 import static org.apache.ignite.internal.app.ApiReferencesTestUtils.KEY_TUPLE;
 import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_QUERY;
@@ -28,8 +29,11 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.internal.streamer.SimplePublisher;
 import org.apache.ignite.internal.table.partition.HashPartition;
 import org.apache.ignite.sql.BatchedArguments;
@@ -128,7 +132,17 @@ enum AsyncApiOperation {
     // SQL_EXECUTE_STATEMENT_WITH_MAPPER(refs -> refs.sql.executeAsync(null, 
Mapper.of(Integer.class), refs.selectIdsStatement)),
     SQL_EXECUTE_BATCH(refs -> refs.sql.executeBatchAsync(null, UPDATE_QUERY, 
BatchedArguments.of(999))),
     SQL_EXECUTE_BATCH_STATEMENT(refs -> refs.sql.executeBatchAsync(null, 
refs.updateStatement, BatchedArguments.of(999))),
-    SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScriptAsync(SELECT_IDS_QUERY));
+    SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScriptAsync(SELECT_IDS_QUERY)),
+
+    COMPUTE_EXECUTE(refs -> refs.compute.executeAsync(
+            anyNode(refs.clusterNodes), 
JobDescriptor.builder(NoOpJob.class).build(), null
+    )),
+    COMPUTE_EXECUTE_BROADCAST(refs -> refs.compute.executeBroadcastAsync(
+            Set.copyOf(refs.clusterNodes),
+            JobDescriptor.builder(NoOpJob.class).build(),
+            null
+    )),
+    COMPUTE_EXECUTE_MAP_REDUCE(refs -> 
refs.compute.executeMapReduceAsync(TaskDescriptor.builder(NoOpMapReduceTask.class).build(),
 null));
 
     private final Function<References, CompletableFuture<?>> action;
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
index 3355a0b567..84da36a4b0 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
@@ -68,7 +68,7 @@ class ItShutDownServerApiReferencesTest extends 
ClusterPerClassIntegrationTest {
 
     @ParameterizedTest
     @EnumSource(AsyncApiOperation.class)
-    void asyncOperationsWorkAfterRestart(AsyncApiOperation operation) {
+    void asyncOperationsThrowAfterShutdown(AsyncApiOperation operation) {
         assertThat(operation.execute(beforeShutdown), 
willThrow(IgniteException.class, 10, SECONDS, "The node is already shut 
down."));
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpJob.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpJob.java
new file mode 100644
index 0000000000..c0cef833fd
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpJob.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.app;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+class NoOpJob implements ComputeJob<Void, String> {
+    @Override
+    public CompletableFuture<String> executeAsync(JobExecutionContext context, 
Void input) {
+        return completedFuture("ok");
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpMapReduceTask.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpMapReduceTask.java
new file mode 100644
index 0000000000..4babb32252
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/NoOpMapReduceTask.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.app;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.jetbrains.annotations.Nullable;
+
+class NoOpMapReduceTask implements MapReduceTask<Void, Void, String, Void> {
+    @Override
+    public CompletableFuture<List<MapReduceJob<Void, String>>> 
splitAsync(TaskExecutionContext taskContext, @Nullable Void input) {
+        return completedFuture(List.of(
+                MapReduceJob.<Void, String>builder()
+                        
.jobDescriptor(JobDescriptor.builder(NoOpJob.class).build())
+                        .nodes(taskContext.ignite().clusterNodes())
+                        .build()
+        ));
+    }
+
+    @Override
+    public CompletableFuture<Void> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, String> results) {
+        return completedFuture(null);
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
index 16efeddb7d..8535f983c8 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
@@ -22,8 +22,11 @@ import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_Q
 import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.TEST_TABLE_NAME;
 import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.UPDATE_QUERY;
 
+import java.util.Collection;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteServer;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.table.IgniteTables;
@@ -44,6 +47,9 @@ class References {
     final IgniteTables tables;
     final IgniteTransactions transactions;
     final IgniteSql sql;
+    final IgniteCompute compute;
+
+    final Collection<ClusterNode> clusterNodes;
 
     final Table table; // From table().
     final Table tableFromTableAsync;
@@ -69,6 +75,9 @@ class References {
         tables = ignite.tables();
         transactions = ignite.transactions();
         sql = ignite.sql();
+        compute = ignite.compute();
+
+        clusterNodes = ignite.clusterNodes();
 
         table = tables.table(TEST_TABLE_NAME);
         tableFromTableAsync = tables.tableAsync(TEST_TABLE_NAME).get(10, 
SECONDS);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
index 8f6ff55d20..d187c139e5 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.app;
 
+import static org.apache.ignite.compute.JobTarget.anyNode;
 import static org.apache.ignite.internal.app.ApiReferencesTestUtils.FULL_TUPLE;
 import static org.apache.ignite.internal.app.ApiReferencesTestUtils.KEY_TUPLE;
 import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_QUERY;
@@ -26,7 +27,10 @@ import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.VALUE_TUPLE;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Consumer;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.TaskDescriptor;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.table.mapper.Mapper;
 
@@ -124,7 +128,28 @@ enum SyncApiOperation {
     // SQL_EXECUTE_STATEMENT_WITH_MAPPER(refs -> refs.sql.execute(null, 
Mapper.of(Integer.class), refs.selectIdsStatement)),
     SQL_EXECUTE_BATCH(refs -> refs.sql.executeBatch(null, UPDATE_QUERY, 
BatchedArguments.of(999))),
     SQL_EXECUTE_BATCH_STATEMENT(refs -> refs.sql.executeBatch(null, 
refs.updateStatement, BatchedArguments.of(999))),
-    SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScript(SELECT_IDS_QUERY));
+    SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScript(SELECT_IDS_QUERY)),
+
+    COMPUTE_SUBMIT(refs -> refs.compute.submit(anyNode(refs.clusterNodes), 
JobDescriptor.builder(NoOpJob.class).build(), null)),
+    COMPUTE_EXECUTE(refs -> refs.compute.execute(anyNode(refs.clusterNodes), 
JobDescriptor.builder(NoOpJob.class).build(), null)),
+    COMPUTE_SUBMIT_BROADCAST(refs -> refs.compute.submitBroadcast(
+            Set.copyOf(refs.clusterNodes),
+            JobDescriptor.builder(NoOpJob.class).build(),
+            null
+    )),
+    COMPUTE_EXECUTE_BROADCAST(refs -> refs.compute.executeBroadcast(
+            Set.copyOf(refs.clusterNodes),
+            JobDescriptor.builder(NoOpJob.class).build(),
+            null
+    )),
+    COMPUTE_SUBMIT_MAP_REDUCE(refs -> refs.compute.submitMapReduce(
+            TaskDescriptor.builder(NoOpMapReduceTask.class).build(),
+            null
+    )),
+    COMPUTE_EXECUTE_MAP_REDUCE(refs -> refs.compute.executeMapReduce(
+            TaskDescriptor.builder(NoOpMapReduceTask.class).build(),
+            null
+    ));
 
     private final Consumer<References> action;
 
@@ -139,6 +164,7 @@ enum SyncApiOperation {
     boolean worksAfterShutdown() {
         return this == IGNITE_TABLES
                 || this == IGNITE_TRANSACTIONS
-                || this == IGNITE_SQL;
+                || this == IGNITE_SQL
+                || this == IGNITE_COMPUTE;
     }
 }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
index d86617527e..07ebd6bd07 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
@@ -40,6 +40,7 @@ public class RestartProofIgnite implements Ignite, Wrapper {
     private final IgniteTables tables;
     private final IgniteTransactions transactions;
     private final IgniteSql sql;
+    private final IgniteCompute compute;
 
     /**
      * Constructor.
@@ -50,6 +51,7 @@ public class RestartProofIgnite implements Ignite, Wrapper {
         tables = new RestartProofIgniteTables(attachmentLock);
         transactions = new RestartProofIgniteTransactions(attachmentLock);
         sql = new RestartProofIgniteSql(attachmentLock);
+        compute = new RestartProofIgniteCompute(attachmentLock);
     }
 
     @Override
@@ -74,8 +76,7 @@ public class RestartProofIgnite implements Ignite, Wrapper {
 
     @Override
     public IgniteCompute compute() {
-        // TODO: IGNITE-23014 - add a wrapper.
-        return attachmentLock.attached(Ignite::compute);
+        return compute;
     }
 
     @Override
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteCompute.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteCompute.java
new file mode 100644
index 0000000000..c0b115e1c6
--- /dev/null
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteCompute.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.restart;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.TaskExecution;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reference to {@link IgniteCompute} under a swappable {@link Ignite} 
instance. When a restart happens, this switches to
+ * the new Ignite instance.
+ *
+ * <p>API operations on this are linearized with respect to node restarts. 
Normally (except for situations when timeouts trigger), user
+ * operations will not interact with detached objects.
+ */
+// TODO; IGNITE-23166 - make returned executions restart-proof.
+class RestartProofIgniteCompute implements IgniteCompute, Wrapper {
+    private final IgniteAttachmentLock attachmentLock;
+
+    RestartProofIgniteCompute(IgniteAttachmentLock attachmentLock) {
+        this.attachmentLock = attachmentLock;
+    }
+
+    @Override
+    public <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, R> 
descriptor, @Nullable T arg) {
+        return attachmentLock.attached(ignite -> 
ignite.compute().submit(target, descriptor, arg));
+    }
+
+    @Override
+    public <T, R> R execute(JobTarget target, JobDescriptor<T, R> descriptor, 
@Nullable T arg) {
+        return attachmentLock.attached(ignite -> 
ignite.compute().execute(target, descriptor, arg));
+    }
+
+    @Override
+    public <T, R> CompletableFuture<R> executeAsync(JobTarget target, 
JobDescriptor<T, R> descriptor, @Nullable T arg) {
+        return attachmentLock.attachedAsync(ignite -> 
ignite.compute().executeAsync(target, descriptor, arg));
+    }
+
+    @Override
+    public <T, R> Map<ClusterNode, JobExecution<R>> submitBroadcast(
+            Set<ClusterNode> nodes,
+            JobDescriptor<T, R> descriptor,
+            @Nullable T arg
+    ) {
+        return attachmentLock.attached(ignite -> 
ignite.compute().submitBroadcast(nodes, descriptor, arg));
+    }
+
+    @Override
+    public <T, R> TaskExecution<R> submitMapReduce(TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg) {
+        return attachmentLock.attached(ignite -> 
ignite.compute().submitMapReduce(taskDescriptor, arg));
+    }
+
+    @Override
+    public <T, R> R executeMapReduce(TaskDescriptor<T, R> taskDescriptor, 
@Nullable T arg) {
+        return attachmentLock.attached(ignite -> 
ignite.compute().executeMapReduce(taskDescriptor, arg));
+    }
+
+    @Override
+    public <T, R> CompletableFuture<R> executeMapReduceAsync(TaskDescriptor<T, 
R> taskDescriptor, @Nullable T arg) {
+        return attachmentLock.attachedAsync(ignite -> 
ignite.compute().executeMapReduceAsync(taskDescriptor, arg));
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> classToUnwrap) {
+        return attachmentLock.attached(ignite -> 
Wrappers.unwrap(ignite.compute(), classToUnwrap));
+    }
+}

Reply via email to