[
https://issues.apache.org/jira/browse/IGNITE-23688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899073#comment-17899073
]
Evgeny Stanilovsky edited comment on IGNITE-23688 at 11/18/24 8:29 AM:
-----------------------------------------------------------------------
Ok lets check a bit different case:
{code:java}
@Test
void cancelComputeSubmitMapReduceWithCancelHandle() {
Ignite entryNode = node(0);
CancelHandle cancelHandle = CancelHandle.create();
TaskExecution<Void> execution = entryNode.compute()
.submitMapReduce(TaskDescriptor.builder(InfiniteMapReduceTask.class).build(),
null);
assertThat(execution.cancelAsync(), willBe(true));
await().atMost(10, TimeUnit.SECONDS).until(() ->
execution.resultAsync().isDone());
}
{code}
{code:java}
public class InfiniteMapReduceTask implements MapReduceTask<Void, Void, Void,
Void> {
@Override
public CompletableFuture<List<MapReduceJob<Void, Void>>>
splitAsync(TaskExecutionContext taskContext, Void input) {
return completedFuture(List.of(
MapReduceJob.<Void, Void>builder()
.jobDescriptor(JobDescriptor.builder(InfiniteMapReduceJob.class).build())
.nodes(taskContext.ignite().clusterNodes())
.build()
));
}
@Override
public CompletableFuture<Void> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, Void> results) {
return completedFuture(null);
}
private static class InfiniteMapReduceJob implements ComputeJob<Void, Void>
{
@Override
public CompletableFuture<Void> executeAsync(JobExecutionContext
context, Void input) {
return new CompletableFuture<>();
}
}
}
{code}
this valid case throws exception, if we call to run N times - it will timeouted.
I can fill different issue for such a case
failure trace:
{code:java}
org.apache.ignite.internal.compute.state.IllegalJobStatusTransition: Failed to
transition job ac349c85-a2b9-4b20-a5a2-2882d54cfff9 from status COMPLETED to
status CANCELING
at
org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$cancelingJob$0(InMemoryComputeStateMachine.java:124)
~[main/:?]
at
org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$changeJobStatus$2(InMemoryComputeStateMachine.java:141)
~[main/:?]
at
org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$changeStatus$3(InMemoryComputeStateMachine.java:158)
~[main/:?]
at
java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1822)
~[?:?]
at
org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.changeStatus(InMemoryComputeStateMachine.java:158)
~[main/:?]
{code}
was (Author: zstan):
Ok lets check a bit different case:
{code:java}
@Test
void cancelComputeSubmitMapReduceWithCancelHandle() {
Ignite entryNode = node(0);
CancelHandle cancelHandle = CancelHandle.create();
TaskExecution<Void> execution = entryNode.compute()
.submitMapReduce(TaskDescriptor.builder(InfiniteMapReduceTask.class).build(),
null);
assertThat(execution.cancelAsync(), willBe(true));
await().atMost(10, TimeUnit.SECONDS).until(() ->
execution.resultAsync().isDone());
}
{code}
{code:java}
public class InfiniteMapReduceTask implements MapReduceTask<Void, Void, Void,
Void> {
@Override
public CompletableFuture<List<MapReduceJob<Void, Void>>>
splitAsync(TaskExecutionContext taskContext, Void input) {
return completedFuture(List.of(
MapReduceJob.<Void, Void>builder()
.jobDescriptor(JobDescriptor.builder(InfiniteMapReduceJob.class).build())
.nodes(taskContext.ignite().clusterNodes())
.build()
));
}
@Override
public CompletableFuture<Void> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, Void> results) {
return completedFuture(null);
}
private static class InfiniteMapReduceJob implements ComputeJob<Void, Void>
{
@Override
public CompletableFuture<Void> executeAsync(JobExecutionContext
context, Void input) {
return new CompletableFuture<>();
}
}
}
{code}
this valid case throws exception, if we call to run N times - it will timeouted.
I can fill different issue for such a case
> Compute execution assertion in embedded mode
> --------------------------------------------
>
> Key: IGNITE-23688
> URL: https://issues.apache.org/jira/browse/IGNITE-23688
> Project: Ignite
> Issue Type: Bug
> Components: compute
> Affects Versions: 3.0.0-beta1
> Reporter: Evgeny Stanilovsky
> Assignee: Vadim Pakhnushev
> Priority: Major
> Labels: ignite-3
>
> I failed to run near test in embedded mode:
> append into ItComputeBaseTest
> {code:java}
> @ParameterizedTest(name = "local: {0}")
> @ValueSource(booleans = {true, false})
> void cancelComputeSubmitWithCancelHandle(boolean local) {
> Ignite entryNode = node(0);
> Ignite executeNode = local ? node(0) : node(1);
> JobDescriptor<Long, Void> job =
> JobDescriptor.builder(SilentSleepJob.class)
> .options(executionOptions).units(units()).build();
> JobExecution<Void> execution =
> entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job,
> 100L);
> await().atMost(10, TimeUnit.SECONDS).until(() ->
> execution.cancelAsync().isDone());
> await().atMost(10, TimeUnit.SECONDS).until(() ->
> execution.resultAsync().isDone());
> }
> {code}
> and
> into : package org.apache.ignite.internal.compute
> {code:java}
> public class SilentSleepJob implements ComputeJob<Long, Void> {
> @Override
> public CompletableFuture<Void> executeAsync(JobExecutionContext
> jobExecutionContext, Long timeout) {
> try {
> TimeUnit.SECONDS.sleep(timeout);
> } catch (InterruptedException e) {
> // no op.
> }
> return null;
> }
> }
> {code}
> got:
> {noformat}
> org.apache.ignite.internal.compute.state.IllegalJobStatusTransition: Failed
> to transition job b692660b-e3ec-4749-80b8-ef1f39d9b7d7 from status CANCELED
> to status CANCELING
> at
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$cancelingJob$0(InMemoryComputeStateMachine.java:124)
> ~[main/:?]
> at
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$changeJobStatus$2(InMemoryComputeStateMachine.java:141)
> ~[main/:?]
> at
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$changeStatus$3(InMemoryComputeStateMachine.java:158)
> ~[main/:?]
> at
> java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1822)
> ~[?:?]
> at
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.changeStatus(InMemoryComputeStateMachine.java:158)
> ~[main/:?]
> at
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.changeJobStatus(InMemoryComputeStateMachine.java:139)
> ~[main/:?]
> at
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.cancelingJob(InMemoryComputeStateMachine.java:116)
> ~[main/:?]
> at
> org.apache.ignite.internal.compute.queue.QueueExecutionImpl.cancel(QueueExecutionImpl.java:94)
> ~[main/:?]
> at
> org.apache.ignite.internal.compute.executor.JobExecutionInternal.cancel(JobExecutionInternal.java:68)
> ~[main/:?]
> at
> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
> ~[?:?]
> at
> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
> ~[?:?]
> at
> java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2100)
> ~[?:?]
> at
> org.apache.ignite.internal.compute.DelegatingJobExecution.cancelAsync(DelegatingJobExecution.java:52)
> ~[main/:?]
> at
> org.apache.ignite.internal.compute.ExecutionManager.cancelAsync(ExecutionManager.java:148)
> ~[main/:?]
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)