This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7cc8a3e IGNITE-16705 Resolve Compute futures that had no chance to be
executed on node stop (#743)
7cc8a3e is described below
commit 7cc8a3eb85abc3ec523623945215b58ce8855e4b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Mar 25 17:25:25 2022 +0400
IGNITE-16705 Resolve Compute futures that had no chance to be executed on
node stop (#743)
---
.../internal/compute/ComputeComponentImpl.java | 17 +++++-
.../internal/compute/ComputeComponentImplTest.java | 25 ++++++--
.../ignite/internal/future/InFlightFutures.java | 59 +++++++++++++++++++
.../internal/future/InFlightFuturesTest.java | 66 ++++++++++++++++++++++
4 files changed, 161 insertions(+), 6 deletions(-)
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 3b7e8a4..5a92415 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -34,6 +34,7 @@ import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.configuration.schemas.compute.ComputeConfiguration;
import org.apache.ignite.internal.compute.message.ExecuteRequest;
import org.apache.ignite.internal.compute.message.ExecuteResponse;
+import org.apache.ignite.internal.future.InFlightFutures;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -68,6 +69,8 @@ public class ComputeComponentImpl implements ComputeComponent
{
/** Prevents double stopping the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
+ private final InFlightFutures inFlightFutures = new InFlightFutures();
+
/**
* Creates a new instance.
*/
@@ -100,6 +103,13 @@ public class ComputeComponentImpl implements
ComputeComponent {
private <R> CompletableFuture<R> doExecuteLocally(Class<? extends
ComputeJob<R>> jobClass, Object[] args) {
assert jobExecutorService != null : "Not started yet!";
+ CompletableFuture<R> future = startLocalExecution(jobClass, args);
+ inFlightFutures.registerFuture(future);
+
+ return future;
+ }
+
+ private <R> CompletableFuture<R> startLocalExecution(Class<? extends
ComputeJob<R>> jobClass, Object[] args) {
try {
return CompletableFuture.supplyAsync(() -> executeJob(jobClass,
args), jobExecutorService);
} catch (RejectedExecutionException e) {
@@ -110,6 +120,7 @@ public class ComputeComponentImpl implements
ComputeComponent {
private <R> R executeJob(Class<? extends ComputeJob<R>> jobClass, Object[]
args) {
ComputeJob<R> job = instantiateJob(jobClass);
JobExecutionContext context = new JobExecutionContextImpl(ignite);
+ // TODO: IGNITE-16746 - translate NodeStoppingException to a public
exception
return job.execute(context, args);
}
@@ -157,8 +168,10 @@ public class ComputeComponentImpl implements
ComputeComponent {
.args(args)
.build();
- return messagingService.invoke(remoteNode, executeRequest,
NETWORK_TIMEOUT_MILLIS)
+ CompletableFuture<R> future = messagingService.invoke(remoteNode,
executeRequest, NETWORK_TIMEOUT_MILLIS)
.thenCompose(message ->
resultFromExecuteResponse((ExecuteResponse) message));
+ inFlightFutures.registerFuture(future);
+ return future;
}
@SuppressWarnings("unchecked")
@@ -246,6 +259,8 @@ public class ComputeComponentImpl implements
ComputeComponent {
busyLock.block();
IgniteUtils.shutdownAndAwaitTermination(jobExecutorService,
stopTimeoutMillis(), TimeUnit.MILLISECONDS);
+
+ inFlightFutures.cancelInFlightFutures();
}
long stopTimeoutMillis() {
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index b9e7cbd..4465890 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -62,7 +62,6 @@ import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessageHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -354,10 +353,8 @@ class ComputeComponentImplTest {
when(threadPoolSizeValue.value()).thenReturn(1);
}
- // TODO: IGNITE-16705 - enable this test
@Test
- @Disabled("IGNITE-16705")
- void
taskDropByExecutorServiceDueToStopCausesCancellationExceptionToBeReturnedViaFuture()
throws Exception {
+ void stopCausesCancellationExceptionOnLocalExecution() throws Exception {
restrictPoolSizeTo1();
computeComponent = new ComputeComponentImpl(ignite, messagingService,
computeConfiguration) {
@@ -382,7 +379,25 @@ class ComputeComponentImplTest {
Object result = resultFuture.get(3, TimeUnit.SECONDS);
assertThat(result, is(instanceOf(CancellationException.class)));
- assertThat(((CancellationException) result).getMessage(),
is("Cancelled due to node stop"));
+ }
+
+ @Test
+ void stopCausesCancellationExceptionOnRemoteExecution() throws Exception {
+ respondWithIncompleteFutureWhenExecuteRequestIsSent();
+
+ CompletableFuture<Object> resultFuture =
computeComponent.executeRemotely(remoteNode, SimpleJob.class)
+ .handle((res, ex) -> ex != null ? ex : res);
+
+ computeComponent.stop();
+
+ Object result = resultFuture.get(3, TimeUnit.SECONDS);
+
+ assertThat(result, is(instanceOf(CancellationException.class)));
+ }
+
+ private void respondWithIncompleteFutureWhenExecuteRequestIsSent() {
+ when(messagingService.invoke(any(ClusterNode.class),
any(ExecuteRequest.class), anyLong()))
+ .thenReturn(new CompletableFuture<>());
}
@Test
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
b/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
new file mode 100644
index 0000000..c9aa19c
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/future/InFlightFutures.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Maintains a collection of in-flight {@link CompletableFuture}s (i.e.
futures that are not yet completed) to later
+ * have a possibility to process them somehow (for example, cancel them all if
we know for sure that they will
+ * never be completed).
+ */
+public class InFlightFutures implements Iterable<CompletableFuture<?>> {
+ private final Set<CompletableFuture<?>> inFlightFutures =
ConcurrentHashMap.newKeySet();
+
+ /**
+ * Registers a future in the in-flight futures collection. When it
completes (either normally or exceptionally),
+ * it will be removed from the collection.
+ *
+ * @param future the future to register
+ */
+ public void registerFuture(CompletableFuture<?> future) {
+ future.whenComplete((result, ex) -> inFlightFutures.remove(future));
+
+ inFlightFutures.add(future);
+ }
+
+ /**
+ * Cancels all in-flight futures (that is, the futures that are not yet
completed).
+ */
+ public void cancelInFlightFutures() {
+ for (CompletableFuture<?> future : this) {
+ future.cancel(true);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Iterator<CompletableFuture<?>> iterator() {
+ return inFlightFutures.iterator();
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/future/InFlightFuturesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/future/InFlightFuturesTest.java
new file mode 100644
index 0000000..02f19b7
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/future/InFlightFuturesTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.future;
+
+import static java.util.Collections.singleton;
+import static java.util.stream.Collectors.toSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.StreamSupport;
+import org.junit.jupiter.api.Test;
+
+class InFlightFuturesTest {
+ private final InFlightFutures inFlightFutures = new InFlightFutures();
+
+ @Test
+ void addsFutureToInFlightSetOnRegistration() {
+ CompletableFuture<Object> incompleteFuture = new CompletableFuture<>();
+
+ inFlightFutures.registerFuture(incompleteFuture);
+
+ assertThat(currentFutures(inFlightFutures),
is(singleton(incompleteFuture)));
+ }
+
+ private Set<CompletableFuture<?>> currentFutures(InFlightFutures
inFlightFutures) {
+ return StreamSupport.stream(inFlightFutures.spliterator(),
false).collect(toSet());
+ }
+
+ @Test
+ void removesFutureFromInFlightSetOnSuccessfulCompletion() {
+ CompletableFuture<Object> future = new CompletableFuture<>();
+ inFlightFutures.registerFuture(future);
+
+ future.complete("ok");
+
+ assertThat(inFlightFutures, is(emptyIterable()));
+ }
+
+ @Test
+ void removesFutureFromInFlightSetOnExceptionalCompletion() {
+ CompletableFuture<Object> future = new CompletableFuture<>();
+ inFlightFutures.registerFuture(future);
+
+ future.completeExceptionally(new Exception());
+
+ assertThat(inFlightFutures, is(emptyIterable()));
+ }
+}