This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f2d7d51b3a IGNITE-20842 Introduce Job repeatable mechanism (#2956)
f2d7d51b3a is described below

commit f2d7d51b3aee81a1b08164ad2d8f14d674993bf0
Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com>
AuthorDate: Mon Dec 18 14:59:02 2023 +0300

    IGNITE-20842 Introduce Job repeatable mechanism (#2956)
---
 .../ignite/internal/compute/ExecutionOptions.java  |  37 +++++-
 .../compute/executor/ComputeExecutorImpl.java      |   3 +-
 .../compute/executor/QueueExecutionImpl.java       |  88 -------------
 .../compute/queue/PriorityQueueExecutor.java       |  43 ++----
 .../ignite/internal/compute/queue/QueueEntry.java  |   7 +-
 .../internal/compute/queue/QueueExecutionImpl.java | 146 +++++++++++++++++++++
 .../compute/state/ComputeStateMachine.java         |   8 ++
 .../compute/state/InMemoryComputeStateMachine.java |   7 +-
 .../compute/executor/ComputeExecutorTest.java      |  92 +++++++++++++
 .../compute/queue/PriorityQueueExecutorTest.java   |  75 +++++++++--
 .../state/InMemoryComputeStateMachineTest.java     |  46 ++++++-
 .../testframework/matchers/JobStatusMatcher.java   |   6 +-
 12 files changed, 409 insertions(+), 149 deletions(-)

diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java
index 940534e554..b07f07e4ab 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java
@@ -21,20 +21,53 @@ package org.apache.ignite.internal.compute;
  * Compute job execution options.
  */
 public class ExecutionOptions {
-    public static final ExecutionOptions DEFAULT = new ExecutionOptions(0);
+    public static final ExecutionOptions DEFAULT = builder().build();
 
     private final int priority;
 
+    private final int maxRetries;
+
     /**
      * Constructor.
      *
      * @param priority Job execution priority.
+     * @param maxRetries Number of times to retry job execution in case of 
failure, 0 to not retry.
      */
-    public ExecutionOptions(int priority) {
+    private ExecutionOptions(int priority, int maxRetries) {
         this.priority = priority;
+        this.maxRetries = maxRetries;
+    }
+
+    public static Builder builder() {
+        return new Builder();
     }
 
     public int priority() {
         return priority;
     }
+
+    public int maxRetries() {
+        return maxRetries;
+    }
+
+    /** Builder. */
+    public static class Builder {
+        private int priority;
+
+        private int maxRetries;
+
+        public Builder priority(int priority) {
+            this.priority = priority;
+            return this;
+        }
+
+        public Builder maxRetries(int maxRetries) {
+            this.maxRetries = maxRetries;
+            return this;
+        }
+
+        public ExecutionOptions build() {
+            return new ExecutionOptions(priority, maxRetries);
+        }
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index 4878c86690..a2792c52f2 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -80,7 +80,8 @@ public class ComputeExecutorImpl implements ComputeExecutor {
 
         QueueExecution<R> execution = executorService.submit(
                 () -> ComputeUtils.instantiateJob(jobClass).execute(context, 
args),
-                options.priority()
+                options.priority(),
+                options.maxRetries()
         );
 
         return new JobExecutionImpl<>(execution, isInterrupted);
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/QueueExecutionImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/QueueExecutionImpl.java
deleted file mode 100644
index 08c896bae8..0000000000
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/QueueExecutionImpl.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.compute.executor;
-
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadPoolExecutor;
-import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.internal.compute.queue.QueueEntry;
-import org.apache.ignite.internal.compute.queue.QueueExecution;
-import org.apache.ignite.internal.compute.state.ComputeStateMachine;
-import org.apache.ignite.internal.compute.state.IllegalJobStateTransition;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-
-/**
- * Queue execution object implementation.
- *
- * @param <R> Job result type.
- */
-public class QueueExecutionImpl<R> implements QueueExecution<R> {
-    private static final IgniteLogger LOG = 
Loggers.forClass(QueueExecutionImpl.class);
-
-    private final UUID jobId;
-    private final CompletableFuture<R> future;
-    private final QueueEntry<R> queueEntry;
-    private final ThreadPoolExecutor executor;
-    private final ComputeStateMachine stateMachine;
-
-    /**
-     * Constructor.
-     *
-     * @param jobId Job id.
-     * @param queueEntry Queue entry.
-     * @param future Result future.
-     * @param executor Executor on which the queue entry is running.
-     * @param stateMachine State machine.
-     */
-    public QueueExecutionImpl(
-            UUID jobId,
-            QueueEntry<R> queueEntry,
-            CompletableFuture<R> future,
-            ThreadPoolExecutor executor,
-            ComputeStateMachine stateMachine
-    ) {
-        this.jobId = jobId;
-        this.future = future;
-        this.queueEntry = queueEntry;
-        this.executor = executor;
-        this.stateMachine = stateMachine;
-    }
-
-    @Override
-    public CompletableFuture<R> resultAsync() {
-        return future;
-    }
-
-    @Override
-    public JobStatus status() {
-        return stateMachine.currentStatus(jobId);
-    }
-
-    @Override
-    public void cancel() {
-        try {
-            stateMachine.cancelingJob(jobId);
-            executor.remove(queueEntry);
-            queueEntry.interrupt();
-        } catch (IllegalJobStateTransition e) {
-            LOG.info("Cannot cancel the job", e);
-        }
-    }
-}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
index c97ffd1ee6..363040aac5 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
@@ -17,21 +17,15 @@
 
 package org.apache.ignite.internal.compute.queue;
 
-import static java.util.concurrent.CompletableFuture.failedFuture;
-import static org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
-
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
-import org.apache.ignite.internal.compute.executor.QueueExecutionImpl;
 import org.apache.ignite.internal.compute.state.ComputeStateMachine;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.lang.IgniteException;
 
 /**
  * Compute job executor with priority mechanism.
@@ -71,50 +65,31 @@ public class PriorityQueueExecutor {
     /**
      * Submit job for execution. Job can be started immediately if queue is 
empty or will be added to queue with provided priority.
      *
+     * @param <R> Job result type.
      * @param job Execute job callable.
      * @param priority Job priority.
-     * @param <R> Job result type.
+     * @param maxRetries Number of retries of the execution after failure, 
{@code 0} means the execution will not be retried.
      * @return Completable future which will be finished when compute job 
finished.
      */
-    public <R> QueueExecution<R> submit(Callable<R> job, int priority) {
+    public <R> QueueExecution<R> submit(Callable<R> job, int priority, int 
maxRetries) {
         Objects.requireNonNull(job);
 
         UUID jobId = stateMachine.initJob();
-        QueueEntry<R> queueEntry = new QueueEntry<>(() -> {
-            stateMachine.executeJob(jobId);
-            return job.call();
-        }, priority);
-
-        return new QueueExecutionImpl<>(jobId, queueEntry, execute(queueEntry, 
jobId), executor, stateMachine);
+        QueueExecutionImpl<R> execution = new QueueExecutionImpl<>(jobId, job, 
priority, executor, stateMachine);
+        execution.run(maxRetries);
+        return execution;
     }
 
     /**
-     * Submit job for execution. Job can be started immediately if queue is 
empty or will be added to queue with default priority.
+     * Submit job for execution. Job can be started immediately if queue is 
empty or will be added to queue with default priority and no
+     * retries.
      *
      * @param job Execute job callable.
      * @param <R> Job result type.
      * @return Completable future which will be finished when compute job 
finished.
      */
     public <R> QueueExecution<R> submit(Callable<R> job) {
-        return submit(job, 0);
-    }
-
-    private <R> CompletableFuture<R> execute(QueueEntry<R> queueEntry, UUID 
jobId) {
-        try {
-            executor.execute(queueEntry);
-        } catch (QueueOverflowException e) {
-            return failedFuture(new IgniteException(QUEUE_OVERFLOW_ERR, e));
-        }
-        return queueEntry.toFuture()
-                .whenComplete((r, throwable) -> {
-                    if (queueEntry.isInterrupted()) {
-                        stateMachine.cancelJob(jobId);
-                    } else if (throwable != null) {
-                        stateMachine.failJob(jobId);
-                    } else {
-                        stateMachine.completeJob(jobId);
-                    }
-                });
+        return submit(job, 0, 0);
     }
 
     /**
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
index cbce1e2461..89e0fafba6 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
@@ -33,7 +33,7 @@ import org.jetbrains.annotations.Nullable;
  *
  * @param <R> Compute job return type.
  */
-public class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> {
+class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> {
     private static final AtomicLong seq = new AtomicLong(Long.MIN_VALUE);
 
     private final CompletableFuture<R> future = new CompletableFuture<>();
@@ -65,7 +65,6 @@ public class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
 
     @Override
     public void run() {
-
         lock.lock();
         try {
             workerThread = Thread.currentThread();
@@ -99,7 +98,7 @@ public class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
     /**
      * Sets interrupt status of the worker thread.
      */
-    public void interrupt() {
+    void interrupt() {
         // Interrupt under the lock to prevent interrupting thread used by the 
pool for another task
         lock.lock();
         try {
@@ -117,7 +116,7 @@ public class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
      *
      * @return {@code true} when the execution was interrupted externally.
      */
-    public boolean isInterrupted() {
+    boolean isInterrupted() {
         return isInterrupted;
     }
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
new file mode 100644
index 0000000000..1158b4dcc6
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.queue;
+
+import static org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.internal.compute.state.ComputeStateMachine;
+import org.apache.ignite.internal.compute.state.IllegalJobStateTransition;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Queue execution object implementation.
+ *
+ * @param <R> Job result type.
+ */
+class QueueExecutionImpl<R> implements QueueExecution<R> {
+    private static final IgniteLogger LOG = 
Loggers.forClass(QueueExecutionImpl.class);
+
+    private final UUID jobId;
+    private final Callable<R> job;
+    private final int priority;
+    private final ThreadPoolExecutor executor;
+    private final ComputeStateMachine stateMachine;
+
+    private final CompletableFuture<R> result = new CompletableFuture<>();
+
+    private final AtomicReference<QueueEntry<R>> queueEntry = new 
AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param jobId Job id.
+     * @param job Execute job callable.
+     * @param priority Job priority.
+     * @param executor Executor on which the queue entry is running.
+     * @param stateMachine State machine.
+     */
+    QueueExecutionImpl(
+            UUID jobId,
+            Callable<R> job,
+            int priority,
+            ThreadPoolExecutor executor,
+            ComputeStateMachine stateMachine
+    ) {
+        this.jobId = jobId;
+        this.job = job;
+        this.priority = priority;
+        this.executor = executor;
+        this.stateMachine = stateMachine;
+    }
+
+    @Override
+    public CompletableFuture<R> resultAsync() {
+        return result;
+    }
+
+    @Override
+    public @Nullable JobStatus status() {
+        return stateMachine.currentStatus(jobId);
+    }
+
+    @Override
+    public void cancel() {
+        try {
+            stateMachine.cancelingJob(jobId);
+
+            QueueEntry<R> queueEntry = this.queueEntry.get();
+            if (queueEntry != null) {
+                executor.remove(queueEntry);
+                queueEntry.interrupt();
+            }
+        } catch (IllegalJobStateTransition e) {
+            LOG.info("Cannot cancel the job", e);
+        }
+    }
+
+    /**
+     * Runs the job, completing the result future and retrying the execution 
in case of failure at most {@code numRetries} times.
+     *
+     * @param numRetries Number of times to retry failed execution.
+     */
+    void run(int numRetries) {
+        QueueEntry<R> queueEntry = new QueueEntry<>(() -> {
+            stateMachine.executeJob(jobId);
+            return job.call();
+        }, priority);
+
+        // Ignoring previous value since it can't be running because we are 
calling run either after the construction or after the failure.
+        this.queueEntry.set(queueEntry);
+
+        try {
+            executor.execute(queueEntry);
+        } catch (QueueOverflowException e) {
+            result.completeExceptionally(new 
IgniteException(QUEUE_OVERFLOW_ERR, e));
+            return;
+        }
+
+        queueEntry.toFuture().whenComplete((r, throwable) -> {
+            if (throwable != null) {
+                if (numRetries > 0) {
+                    stateMachine.queueJob(jobId);
+                    run(numRetries - 1);
+                } else {
+                    result.completeExceptionally(throwable);
+                    if (queueEntry.isInterrupted()) {
+                        stateMachine.cancelJob(jobId);
+                    } else {
+                        stateMachine.failJob(jobId);
+                    }
+                }
+            } else {
+                result.complete(r);
+                if (queueEntry.isInterrupted()) {
+                    stateMachine.cancelJob(jobId);
+                } else {
+                    stateMachine.completeJob(jobId);
+                }
+            }
+        });
+    }
+
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java
index 03db3eb17c..0aa2ae7045 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java
@@ -83,6 +83,14 @@ public interface ComputeStateMachine {
      */
     void failJob(UUID jobId);
 
+    /**
+     * Tries to transfer Compute Job to queued state from the {@link 
JobState#EXECUTING} state, used for retrying.
+     *
+     * @param jobId Compute job identifier.
+     * @throws IllegalJobStateTransition in case when job can't be transferred 
to failed state.
+     */
+    void queueJob(UUID jobId);
+
     /**
      * Returns current status of Compute Job.
      *
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
index 6e4d38af22..f35b84f4bb 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java
@@ -120,6 +120,11 @@ public class InMemoryComputeStateMachine implements 
ComputeStateMachine {
         waitToRemove.add(jobId);
     }
 
+    @Override
+    public void queueJob(UUID jobId) {
+        changeJobState(jobId, QUEUED);
+    }
+
     @Override
     public void completeJob(UUID jobId) {
         changeJobState(jobId, COMPLETED);
@@ -199,7 +204,7 @@ public class InMemoryComputeStateMachine implements 
ComputeStateMachine {
             case QUEUED:
                 return toState == EXECUTING || toState == CANCELING || toState 
== CANCELED;
             case EXECUTING:
-                return toState == FAILED || toState == COMPLETED || toState == 
CANCELING || toState == CANCELED;
+                return toState == FAILED || toState == COMPLETED || toState == 
CANCELING || toState == CANCELED || toState == QUEUED;
             case CANCELING:
                 return toState == CANCELED || toState == FAILED || toState == 
COMPLETED;
             case FAILED:
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index bbf3ba1d9a..0bf77532ca 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -18,11 +18,17 @@
 package org.apache.ignite.internal.compute.executor;
 
 import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.COMPLETED;
 import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithStateAndCreateTimeStartTime;
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecution;
@@ -114,4 +120,90 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
         }
     }
 
+    @Test
+    void retryJobFail() {
+        AtomicInteger runTimes = new AtomicInteger();
+
+        int maxRetries = 5;
+
+        JobExecution<Integer> execution = computeExecutor.executeJob(
+                ExecutionOptions.builder().maxRetries(maxRetries).build(),
+                RetryJobFail.class,
+                new Object[]{runTimes}
+        );
+
+        await().until(execution::status, jobStatusWithState(FAILED));
+
+        assertThat(runTimes.get(), is(maxRetries + 1));
+    }
+
+    private static class RetryJobFail implements ComputeJob<Integer> {
+
+        @Override
+        public Integer execute(JobExecutionContext context, Object... args) {
+            AtomicInteger runTimes = (AtomicInteger) args[0];
+            runTimes.incrementAndGet();
+            throw new RuntimeException();
+        }
+    }
+
+    @Test
+    void retryJobSuccess() {
+        AtomicInteger runTimes = new AtomicInteger();
+
+        int maxRetries = 5;
+
+        JobExecution<Integer> execution = computeExecutor.executeJob(
+                ExecutionOptions.builder().maxRetries(maxRetries).build(),
+                RetryJobSuccess.class,
+                new Object[]{runTimes, maxRetries}
+        );
+
+        await().until(execution::status, jobStatusWithState(COMPLETED));
+
+        assertThat(runTimes.get(), is(maxRetries + 1));
+    }
+
+    private static class RetryJobSuccess implements ComputeJob<Integer> {
+
+        @Override
+        public Integer execute(JobExecutionContext context, Object... args) {
+            AtomicInteger runTimes = (AtomicInteger) args[0];
+            int maxRetries = (int) args[1];
+            if (runTimes.incrementAndGet() <= maxRetries) {
+                throw new RuntimeException();
+            }
+            return 0;
+        }
+
+    }
+
+    @Test
+    void runJobOnce() {
+        AtomicInteger runTimes = new AtomicInteger();
+
+        int maxRetries = 5;
+
+        JobExecution<Integer> execution = computeExecutor.executeJob(
+                ExecutionOptions.builder().maxRetries(maxRetries).build(),
+                JobSuccess.class,
+                new Object[]{runTimes}
+        );
+
+        await().until(execution::status, jobStatusWithState(COMPLETED));
+
+        assertThat(execution.resultAsync(), willBe(1));
+        assertThat(runTimes.get(), is(1));
+    }
+
+    private static class JobSuccess implements ComputeJob<Integer> {
+
+        @Override
+        public Integer execute(JobExecutionContext context, Object... args) {
+            AtomicInteger runTimes = (AtomicInteger) args[0];
+            return runTimes.incrementAndGet();
+        }
+
+    }
+
 }
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index 22b56e0061..9e18bc1758 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.compute.queue;
 import static org.apache.ignite.compute.JobState.CANCELED;
 import static org.apache.ignite.compute.JobState.COMPLETED;
 import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
 import static org.apache.ignite.compute.JobState.QUEUED;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
@@ -28,13 +29,13 @@ import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithStateAndCreateTimeStartTime;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.compute.JobStatus;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
 import org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine;
@@ -233,10 +234,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
             }
         });
 
-        JobStatus executingStatus = await().until(
-                execution::status,
-                jobStatusWithState(equalTo(EXECUTING))
-        );
+        JobStatus executingStatus = await().until(execution::status, 
jobStatusWithState(EXECUTING));
 
         execution.cancel();
 
@@ -256,10 +254,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
             return 0;
         });
 
-        JobStatus executingStatus = await().until(
-                execution::status,
-                jobStatusWithState(equalTo(EXECUTING))
-        );
+        JobStatus executingStatus = await().until(execution::status, 
jobStatusWithState(EXECUTING));
 
         execution.cancel();
 
@@ -309,6 +304,60 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
         assertThat(execution.resultAsync(), willTimeoutIn(100, 
TimeUnit.MILLISECONDS));
     }
 
+    @Test
+    void retryTaskFail() {
+        initExecutor(1);
+
+        AtomicInteger runTimes = new AtomicInteger();
+
+        int maxRetries = 5;
+
+        QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> {
+            runTimes.incrementAndGet();
+            throw new RuntimeException();
+        }, 0, maxRetries);
+
+        await().until(execution::status, jobStatusWithState(FAILED));
+
+        assertThat(runTimes.get(), is(maxRetries + 1));
+    }
+
+    @Test
+    void retryTaskSuccess() {
+        initExecutor(1);
+
+        AtomicInteger runTimes = new AtomicInteger();
+
+        int maxRetries = 5;
+
+        QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> {
+            if (runTimes.incrementAndGet() <= maxRetries) {
+                throw new RuntimeException();
+            }
+            return 0;
+        }, 0, maxRetries);
+
+        await().until(execution::status, jobStatusWithState(COMPLETED));
+
+        assertThat(runTimes.get(), is(maxRetries + 1));
+    }
+
+    @Test
+    void defaultTaskIsNotRetried() {
+        initExecutor(1);
+
+        AtomicInteger runTimes = new AtomicInteger();
+
+        QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> {
+            runTimes.incrementAndGet();
+            throw new RuntimeException();
+        });
+
+        await().until(execution::status, jobStatusWithState(FAILED));
+
+        assertThat(runTimes.get(), is(1));
+    }
+
     private void initExecutor(int threads) {
         initExecutor(threads, Integer.MAX_VALUE);
     }
@@ -327,10 +376,14 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
     }
 
     private <R> CompletableFuture<R> submit(Callable<R> job) {
-        return priorityQueueExecutor.submit(job).resultAsync();
+        return submit(job, 0);
     }
 
     private <R> CompletableFuture<R> submit(Callable<R> job, int priority) {
-        return priorityQueueExecutor.submit(job, priority).resultAsync();
+        return submit(job, priority, 0);
+    }
+
+    private <R> CompletableFuture<R> submit(Callable<R> job, int priority, int 
maxRetries) {
+        return priorityQueueExecutor.submit(job, priority, 
maxRetries).resultAsync();
     }
 }
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java
index 97ccd9d5ab..e514a2b398 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java
@@ -22,6 +22,8 @@ import static org.apache.ignite.compute.JobState.CANCELING;
 import static org.apache.ignite.compute.JobState.COMPLETED;
 import static org.apache.ignite.compute.JobState.EXECUTING;
 import static org.apache.ignite.compute.JobState.FAILED;
+import static org.apache.ignite.compute.JobState.QUEUED;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.AnythingMatcher.anything;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
@@ -39,7 +41,6 @@ import 
org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -72,6 +73,7 @@ public class InMemoryComputeStateMachineTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testSubmit() {
         assertThat(jobId, is(notNullValue()));
+        assertThat(stateMachine.currentStatus(jobId), 
jobStatusWithState(QUEUED));
     }
 
     @Test
@@ -82,8 +84,7 @@ public class InMemoryComputeStateMachineTest extends 
BaseIgniteAbstractTest {
 
     @Test
     public void testCancel() {
-        stateMachine.cancelJob(jobId);
-        assertThat(stateMachine.currentStatus(jobId), 
jobStatusWithState(CANCELED));
+        cancelJob(false);
     }
 
     @Test
@@ -119,6 +120,12 @@ public class InMemoryComputeStateMachineTest extends 
BaseIgniteAbstractTest {
         completeJob(false);
     }
 
+    @Test
+    public void testQueue() {
+        executeJob(false);
+        queueJob(false);
+    }
+
     @Test
     public void testDoubleExecution() {
         executeJob(false);
@@ -141,6 +148,14 @@ public class InMemoryComputeStateMachineTest extends 
BaseIgniteAbstractTest {
         failJob(true);
     }
 
+    @Test
+    public void testDoubleQueue() {
+        executeJob(false);
+
+        queueJob(false);
+        queueJob(true);
+    }
+
     @Test
     public void testCleanStates() throws InterruptedException {
         assertThat(configuration.change(change -> 
change.changeStatesLifetimeMillis(100)), willCompleteSuccessfully());
@@ -151,22 +166,22 @@ public class InMemoryComputeStateMachineTest extends 
BaseIgniteAbstractTest {
         jobId = stateMachine.initJob();
         executeJob(false);
         completeJob(false);
-        IgniteTestUtils.waitForCondition(() -> 
stateMachine.currentStatus(jobId) == null, 100);
+        waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100);
 
         jobId = stateMachine.initJob();
         executeJob(false);
         failJob(false);
-        IgniteTestUtils.waitForCondition(() -> 
stateMachine.currentStatus(jobId) == null, 100);
+        waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100);
 
         jobId = stateMachine.initJob();
         cancelJob(false);
-        IgniteTestUtils.waitForCondition(() -> 
stateMachine.currentStatus(jobId) == null, 100);
+        waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100);
 
         jobId = stateMachine.initJob();
         executeJob(false);
         cancelingJob(false);
         cancelJob(false);
-        IgniteTestUtils.waitForCondition(() -> 
stateMachine.currentStatus(jobId) == null, 100);
+        waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100);
 
         stateMachine.stop();
     }
@@ -255,4 +270,21 @@ public class InMemoryComputeStateMachineTest extends 
BaseIgniteAbstractTest {
             assertThrows(IllegalJobStateTransition.class, () -> 
stateMachine.failJob(jobId));
         }
     }
+
+    private void queueJob(boolean shouldFail) {
+        if (!shouldFail) {
+            stateMachine.queueJob(jobId);
+            assertThat(
+                    stateMachine.currentStatus(jobId),
+                    jobStatusWithStateAndCreateTimeStartTimeFinishTime(
+                            equalTo(QUEUED),
+                            notNullValue(Instant.class),
+                            notNullValue(Instant.class),
+                            nullValue(Instant.class)
+                    )
+            );
+        } else {
+            assertThrows(IllegalJobStateTransition.class, () -> 
stateMachine.queueJob(jobId));
+        }
+    }
 }
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/JobStatusMatcher.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/JobStatusMatcher.java
index 2cd563a03f..06799e1f6d 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/JobStatusMatcher.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/JobStatusMatcher.java
@@ -203,9 +203,13 @@ public class JobStatusMatcher extends 
BaseMatcher<JobStatus> {
             mismatchDescription.appendText("was not a JobStatus: 
").appendValue(actual);
         } else {
             JobStatus status = (JobStatus) actual;
+            mismatchDescription.appendText("state ");
             stateMatcher.describeMismatch(status.state(), mismatchDescription);
+            mismatchDescription.appendText(", create time ");
             createTimeMatcher.describeMismatch(status.createTime(), 
mismatchDescription);
+            mismatchDescription.appendText(", start time ");
             startTimeMatcher.describeMismatch(status.startTime(), 
mismatchDescription);
+            mismatchDescription.appendText(" and finish time ");
             finishTimeMatcher.describeMismatch(status.finishTime(), 
mismatchDescription);
         }
     }
@@ -218,7 +222,7 @@ public class JobStatusMatcher extends 
BaseMatcher<JobStatus> {
                 .appendDescriptionOf(createTimeMatcher)
                 .appendText(", start time ")
                 .appendDescriptionOf(startTimeMatcher)
-                .appendText(", and finish time ")
+                .appendText(" and finish time ")
                 .appendDescriptionOf(finishTimeMatcher);
     }
 }


Reply via email to