[ 
https://issues.apache.org/jira/browse/IGNITE-23688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899073#comment-17899073
 ] 

Evgeny Stanilovsky edited comment on IGNITE-23688 at 11/18/24 8:29 AM:
-----------------------------------------------------------------------

Ok lets check a bit different case:


{code:java}
    @Test
    void cancelComputeSubmitMapReduceWithCancelHandle() {
        Ignite entryNode = node(0);

        CancelHandle cancelHandle = CancelHandle.create();

        TaskExecution<Void> execution = entryNode.compute()
                
.submitMapReduce(TaskDescriptor.builder(InfiniteMapReduceTask.class).build(), 
null);

        assertThat(execution.cancelAsync(), willBe(true));

        await().atMost(10, TimeUnit.SECONDS).until(() -> 
execution.resultAsync().isDone());
    }
{code}



{code:java}
public class InfiniteMapReduceTask implements MapReduceTask<Void, Void, Void, 
Void> {
    @Override
    public CompletableFuture<List<MapReduceJob<Void, Void>>> 
splitAsync(TaskExecutionContext taskContext, Void input) {
        return completedFuture(List.of(
                MapReduceJob.<Void, Void>builder()
                        
.jobDescriptor(JobDescriptor.builder(InfiniteMapReduceJob.class).build())
                        .nodes(taskContext.ignite().clusterNodes())
                        .build()
        ));
    }

    @Override
    public CompletableFuture<Void> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, Void> results) {
        return completedFuture(null);
    }

    private static class InfiniteMapReduceJob implements ComputeJob<Void, Void> 
{
        @Override
        public CompletableFuture<Void> executeAsync(JobExecutionContext 
context, Void input) {
            return new CompletableFuture<>();
        }
    }
}
{code}

this valid case throws exception, if we call to run N times - it will timeouted.

I can fill different issue for such a case

failure trace:

{code:java}
org.apache.ignite.internal.compute.state.IllegalJobStatusTransition: Failed to 
transition job ac349c85-a2b9-4b20-a5a2-2882d54cfff9 from status COMPLETED to 
status CANCELING
        at 
org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$cancelingJob$0(InMemoryComputeStateMachine.java:124)
 ~[main/:?]
        at 
org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$changeJobStatus$2(InMemoryComputeStateMachine.java:141)
 ~[main/:?]
        at 
org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$changeStatus$3(InMemoryComputeStateMachine.java:158)
 ~[main/:?]
        at 
java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1822)
 ~[?:?]
        at 
org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.changeStatus(InMemoryComputeStateMachine.java:158)
 ~[main/:?]
{code}



was (Author: zstan):
Ok lets check a bit different case:


{code:java}
    @Test
    void cancelComputeSubmitMapReduceWithCancelHandle() {
        Ignite entryNode = node(0);

        CancelHandle cancelHandle = CancelHandle.create();

        TaskExecution<Void> execution = entryNode.compute()
                
.submitMapReduce(TaskDescriptor.builder(InfiniteMapReduceTask.class).build(), 
null);

        assertThat(execution.cancelAsync(), willBe(true));

        await().atMost(10, TimeUnit.SECONDS).until(() -> 
execution.resultAsync().isDone());
    }
{code}



{code:java}
public class InfiniteMapReduceTask implements MapReduceTask<Void, Void, Void, 
Void> {
    @Override
    public CompletableFuture<List<MapReduceJob<Void, Void>>> 
splitAsync(TaskExecutionContext taskContext, Void input) {
        return completedFuture(List.of(
                MapReduceJob.<Void, Void>builder()
                        
.jobDescriptor(JobDescriptor.builder(InfiniteMapReduceJob.class).build())
                        .nodes(taskContext.ignite().clusterNodes())
                        .build()
        ));
    }

    @Override
    public CompletableFuture<Void> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, Void> results) {
        return completedFuture(null);
    }

    private static class InfiniteMapReduceJob implements ComputeJob<Void, Void> 
{
        @Override
        public CompletableFuture<Void> executeAsync(JobExecutionContext 
context, Void input) {
            return new CompletableFuture<>();
        }
    }
}
{code}

this valid case throws exception, if we call to run N times - it will timeouted.

I can fill different issue for such a case

> Compute execution assertion in embedded mode
> --------------------------------------------
>
>                 Key: IGNITE-23688
>                 URL: https://issues.apache.org/jira/browse/IGNITE-23688
>             Project: Ignite
>          Issue Type: Bug
>          Components: compute
>    Affects Versions: 3.0.0-beta1
>            Reporter: Evgeny Stanilovsky
>            Assignee: Vadim Pakhnushev
>            Priority: Major
>              Labels: ignite-3
>
> I failed to run near test in embedded mode:
> append into ItComputeBaseTest
> {code:java}
>     @ParameterizedTest(name = "local: {0}")
>     @ValueSource(booleans = {true, false})
>     void cancelComputeSubmitWithCancelHandle(boolean local) {
>         Ignite entryNode = node(0);
>         Ignite executeNode = local ? node(0) : node(1);
>         JobDescriptor<Long, Void> job = 
> JobDescriptor.builder(SilentSleepJob.class)
>                 .options(executionOptions).units(units()).build();
>         JobExecution<Void> execution = 
> entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job, 
> 100L);
>         await().atMost(10, TimeUnit.SECONDS).until(() -> 
> execution.cancelAsync().isDone());
>         await().atMost(10, TimeUnit.SECONDS).until(() -> 
> execution.resultAsync().isDone());
>     }
> {code}
> and 
> into : package org.apache.ignite.internal.compute
> {code:java}
> public class SilentSleepJob implements ComputeJob<Long, Void> {
>     @Override
>     public CompletableFuture<Void> executeAsync(JobExecutionContext 
> jobExecutionContext, Long timeout) {
>         try {
>             TimeUnit.SECONDS.sleep(timeout);
>         } catch (InterruptedException e) {
>             // no op.
>         }
>         return null;
>     }
> }
> {code}
> got:
> {noformat}
> org.apache.ignite.internal.compute.state.IllegalJobStatusTransition: Failed 
> to transition job b692660b-e3ec-4749-80b8-ef1f39d9b7d7 from status CANCELED 
> to status CANCELING
>       at 
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$cancelingJob$0(InMemoryComputeStateMachine.java:124)
>  ~[main/:?]
>       at 
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$changeJobStatus$2(InMemoryComputeStateMachine.java:141)
>  ~[main/:?]
>       at 
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.lambda$changeStatus$3(InMemoryComputeStateMachine.java:158)
>  ~[main/:?]
>       at 
> java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1822)
>  ~[?:?]
>       at 
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.changeStatus(InMemoryComputeStateMachine.java:158)
>  ~[main/:?]
>       at 
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.changeJobStatus(InMemoryComputeStateMachine.java:139)
>  ~[main/:?]
>       at 
> org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine.cancelingJob(InMemoryComputeStateMachine.java:116)
>  ~[main/:?]
>       at 
> org.apache.ignite.internal.compute.queue.QueueExecutionImpl.cancel(QueueExecutionImpl.java:94)
>  ~[main/:?]
>       at 
> org.apache.ignite.internal.compute.executor.JobExecutionInternal.cancel(JobExecutionInternal.java:68)
>  ~[main/:?]
>       at 
> java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
>  ~[?:?]
>       at 
> java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
>  ~[?:?]
>       at 
> java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2100)
>  ~[?:?]
>       at 
> org.apache.ignite.internal.compute.DelegatingJobExecution.cancelAsync(DelegatingJobExecution.java:52)
>  ~[main/:?]
>       at 
> org.apache.ignite.internal.compute.ExecutionManager.cancelAsync(ExecutionManager.java:148)
>  ~[main/:?]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to