This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7cc8a3e  IGNITE-16705 Resolve Compute futures that had no chance to be 
executed on node stop (#743)
7cc8a3e is described below

commit 7cc8a3eb85abc3ec523623945215b58ce8855e4b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Mar 25 17:25:25 2022 +0400

    IGNITE-16705 Resolve Compute futures that had no chance to be executed on 
node stop (#743)
---
 .../internal/compute/ComputeComponentImpl.java     | 17 +++++-
 .../internal/compute/ComputeComponentImplTest.java | 25 ++++++--
 .../ignite/internal/future/InFlightFutures.java    | 59 +++++++++++++++++++
 .../internal/future/InFlightFuturesTest.java       | 66 ++++++++++++++++++++++
 4 files changed, 161 insertions(+), 6 deletions(-)

diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 3b7e8a4..5a92415 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -34,6 +34,7 @@ import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
 import org.apache.ignite.internal.compute.message.ExecuteRequest;
 import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.future.InFlightFutures;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -68,6 +69,8 @@ public class ComputeComponentImpl implements ComputeComponent 
{
     /** Prevents double stopping the component. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
+    private final InFlightFutures inFlightFutures = new InFlightFutures();
+
     /**
      * Creates a new instance.
      */
@@ -100,6 +103,13 @@ public class ComputeComponentImpl implements 
ComputeComponent {
     private <R> CompletableFuture<R> doExecuteLocally(Class<? extends 
ComputeJob<R>> jobClass, Object[] args) {
         assert jobExecutorService != null : "Not started yet!";
 
+        CompletableFuture<R> future = startLocalExecution(jobClass, args);
+        inFlightFutures.registerFuture(future);
+
+        return future;
+    }
+
+    private <R> CompletableFuture<R> startLocalExecution(Class<? extends 
ComputeJob<R>> jobClass, Object[] args) {
         try {
             return CompletableFuture.supplyAsync(() -> executeJob(jobClass, 
args), jobExecutorService);
         } catch (RejectedExecutionException e) {
@@ -110,6 +120,7 @@ public class ComputeComponentImpl implements 
ComputeComponent {
     private <R> R executeJob(Class<? extends ComputeJob<R>> jobClass, Object[] 
args) {
         ComputeJob<R> job = instantiateJob(jobClass);
         JobExecutionContext context = new JobExecutionContextImpl(ignite);
+        // TODO: IGNITE-16746 - translate NodeStoppingException to a public 
exception
         return job.execute(context, args);
     }
 
@@ -157,8 +168,10 @@ public class ComputeComponentImpl implements 
ComputeComponent {
                 .args(args)
                 .build();
 
-        return messagingService.invoke(remoteNode, executeRequest, 
NETWORK_TIMEOUT_MILLIS)
+        CompletableFuture<R> future = messagingService.invoke(remoteNode, 
executeRequest, NETWORK_TIMEOUT_MILLIS)
                 .thenCompose(message -> 
resultFromExecuteResponse((ExecuteResponse) message));
+        inFlightFutures.registerFuture(future);
+        return future;
     }
 
     @SuppressWarnings("unchecked")
@@ -246,6 +259,8 @@ public class ComputeComponentImpl implements 
ComputeComponent {
         busyLock.block();
 
         IgniteUtils.shutdownAndAwaitTermination(jobExecutorService, 
stopTimeoutMillis(), TimeUnit.MILLISECONDS);
+
+        inFlightFutures.cancelInFlightFutures();
     }
 
     long stopTimeoutMillis() {
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index b9e7cbd..4465890 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -62,7 +62,6 @@ import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessageHandler;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -354,10 +353,8 @@ class ComputeComponentImplTest {
         when(threadPoolSizeValue.value()).thenReturn(1);
     }
 
-    // TODO: IGNITE-16705 - enable this test
     @Test
-    @Disabled("IGNITE-16705")
-    void 
taskDropByExecutorServiceDueToStopCausesCancellationExceptionToBeReturnedViaFuture()
 throws Exception {
+    void stopCausesCancellationExceptionOnLocalExecution() throws Exception {
         restrictPoolSizeTo1();
 
         computeComponent = new ComputeComponentImpl(ignite, messagingService, 
computeConfiguration) {
@@ -382,7 +379,25 @@ class ComputeComponentImplTest {
         Object result = resultFuture.get(3, TimeUnit.SECONDS);
 
         assertThat(result, is(instanceOf(CancellationException.class)));
-        assertThat(((CancellationException) result).getMessage(), 
is("Cancelled due to node stop"));
+    }
+
+    @Test
+    void stopCausesCancellationExceptionOnRemoteExecution() throws Exception {
+        respondWithIncompleteFutureWhenExecuteRequestIsSent();
+
+        CompletableFuture<Object> resultFuture = 
computeComponent.executeRemotely(remoteNode, SimpleJob.class)
+                .handle((res, ex) -> ex != null ? ex : res);
+
+        computeComponent.stop();
+
+        Object result = resultFuture.get(3, TimeUnit.SECONDS);
+
+        assertThat(result, is(instanceOf(CancellationException.class)));
+    }
+
+    private void respondWithIncompleteFutureWhenExecuteRequestIsSent() {
+        when(messagingService.invoke(any(ClusterNode.class), 
any(ExecuteRequest.class), anyLong()))
+                .thenReturn(new CompletableFuture<>());
     }
 
     @Test
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
new file mode 100644
index 0000000..c9aa19c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Maintains a collection of in-flight {@link CompletableFuture}s (i.e. 
futures that are not yet completed) to later
+ * have a possibility to process them somehow (for example, cancel them all if 
we know for sure that they will
+ * never be completed).
+ */
+public class InFlightFutures implements Iterable<CompletableFuture<?>> {
+    private final Set<CompletableFuture<?>> inFlightFutures = 
ConcurrentHashMap.newKeySet();
+
+    /**
+     * Registers a future in the in-flight futures collection. When it 
completes (either normally or exceptionally),
+     * it will be removed from the collection.
+     *
+     * @param future the future to register
+     */
+    public void registerFuture(CompletableFuture<?> future) {
+        future.whenComplete((result, ex) -> inFlightFutures.remove(future));
+
+        inFlightFutures.add(future);
+    }
+
+    /**
+     * Cancels all in-flight futures (that is, the futures that are not yet 
completed).
+     */
+    public void cancelInFlightFutures() {
+        for (CompletableFuture<?> future : this) {
+            future.cancel(true);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Iterator<CompletableFuture<?>> iterator() {
+        return inFlightFutures.iterator();
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/future/InFlightFuturesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/future/InFlightFuturesTest.java
new file mode 100644
index 0000000..02f19b7
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/future/InFlightFuturesTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static java.util.Collections.singleton;
+import static java.util.stream.Collectors.toSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.StreamSupport;
+import org.junit.jupiter.api.Test;
+
+class InFlightFuturesTest {
+    private final InFlightFutures inFlightFutures = new InFlightFutures();
+
+    @Test
+    void addsFutureToInFlightSetOnRegistration() {
+        CompletableFuture<Object> incompleteFuture = new CompletableFuture<>();
+
+        inFlightFutures.registerFuture(incompleteFuture);
+
+        assertThat(currentFutures(inFlightFutures), 
is(singleton(incompleteFuture)));
+    }
+
+    private Set<CompletableFuture<?>> currentFutures(InFlightFutures 
inFlightFutures) {
+        return StreamSupport.stream(inFlightFutures.spliterator(), 
false).collect(toSet());
+    }
+
+    @Test
+    void removesFutureFromInFlightSetOnSuccessfulCompletion() {
+        CompletableFuture<Object> future = new CompletableFuture<>();
+        inFlightFutures.registerFuture(future);
+
+        future.complete("ok");
+
+        assertThat(inFlightFutures, is(emptyIterable()));
+    }
+
+    @Test
+    void removesFutureFromInFlightSetOnExceptionalCompletion() {
+        CompletableFuture<Object> future = new CompletableFuture<>();
+        inFlightFutures.registerFuture(future);
+
+        future.completeExceptionally(new Exception());
+
+        assertThat(inFlightFutures, is(emptyIterable()));
+    }
+}

Reply via email to