This is an automated email from the ASF dual-hosted git repository. apkhmv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new f2d7d51b3a IGNITE-20842 Introduce Job repeatable mechanism (#2956) f2d7d51b3a is described below commit f2d7d51b3aee81a1b08164ad2d8f14d674993bf0 Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com> AuthorDate: Mon Dec 18 14:59:02 2023 +0300 IGNITE-20842 Introduce Job repeatable mechanism (#2956) --- .../ignite/internal/compute/ExecutionOptions.java | 37 +++++- .../compute/executor/ComputeExecutorImpl.java | 3 +- .../compute/executor/QueueExecutionImpl.java | 88 ------------- .../compute/queue/PriorityQueueExecutor.java | 43 ++---- .../ignite/internal/compute/queue/QueueEntry.java | 7 +- .../internal/compute/queue/QueueExecutionImpl.java | 146 +++++++++++++++++++++ .../compute/state/ComputeStateMachine.java | 8 ++ .../compute/state/InMemoryComputeStateMachine.java | 7 +- .../compute/executor/ComputeExecutorTest.java | 92 +++++++++++++ .../compute/queue/PriorityQueueExecutorTest.java | 75 +++++++++-- .../state/InMemoryComputeStateMachineTest.java | 46 ++++++- .../testframework/matchers/JobStatusMatcher.java | 6 +- 12 files changed, 409 insertions(+), 149 deletions(-) diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java index 940534e554..b07f07e4ab 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionOptions.java @@ -21,20 +21,53 @@ package org.apache.ignite.internal.compute; * Compute job execution options. */ public class ExecutionOptions { - public static final ExecutionOptions DEFAULT = new ExecutionOptions(0); + public static final ExecutionOptions DEFAULT = builder().build(); private final int priority; + private final int maxRetries; + /** * Constructor. * * @param priority Job execution priority. + * @param maxRetries Number of times to retry job execution in case of failure, 0 to not retry. */ - public ExecutionOptions(int priority) { + private ExecutionOptions(int priority, int maxRetries) { this.priority = priority; + this.maxRetries = maxRetries; + } + + public static Builder builder() { + return new Builder(); } public int priority() { return priority; } + + public int maxRetries() { + return maxRetries; + } + + /** Builder. */ + public static class Builder { + private int priority; + + private int maxRetries; + + public Builder priority(int priority) { + this.priority = priority; + return this; + } + + public Builder maxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public ExecutionOptions build() { + return new ExecutionOptions(priority, maxRetries); + } + } } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java index 4878c86690..a2792c52f2 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java @@ -80,7 +80,8 @@ public class ComputeExecutorImpl implements ComputeExecutor { QueueExecution<R> execution = executorService.submit( () -> ComputeUtils.instantiateJob(jobClass).execute(context, args), - options.priority() + options.priority(), + options.maxRetries() ); return new JobExecutionImpl<>(execution, isInterrupted); diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/QueueExecutionImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/QueueExecutionImpl.java deleted file mode 100644 index 08c896bae8..0000000000 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/QueueExecutionImpl.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.compute.executor; - -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadPoolExecutor; -import org.apache.ignite.compute.JobStatus; -import org.apache.ignite.internal.compute.queue.QueueEntry; -import org.apache.ignite.internal.compute.queue.QueueExecution; -import org.apache.ignite.internal.compute.state.ComputeStateMachine; -import org.apache.ignite.internal.compute.state.IllegalJobStateTransition; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; - -/** - * Queue execution object implementation. - * - * @param <R> Job result type. - */ -public class QueueExecutionImpl<R> implements QueueExecution<R> { - private static final IgniteLogger LOG = Loggers.forClass(QueueExecutionImpl.class); - - private final UUID jobId; - private final CompletableFuture<R> future; - private final QueueEntry<R> queueEntry; - private final ThreadPoolExecutor executor; - private final ComputeStateMachine stateMachine; - - /** - * Constructor. - * - * @param jobId Job id. - * @param queueEntry Queue entry. - * @param future Result future. - * @param executor Executor on which the queue entry is running. - * @param stateMachine State machine. - */ - public QueueExecutionImpl( - UUID jobId, - QueueEntry<R> queueEntry, - CompletableFuture<R> future, - ThreadPoolExecutor executor, - ComputeStateMachine stateMachine - ) { - this.jobId = jobId; - this.future = future; - this.queueEntry = queueEntry; - this.executor = executor; - this.stateMachine = stateMachine; - } - - @Override - public CompletableFuture<R> resultAsync() { - return future; - } - - @Override - public JobStatus status() { - return stateMachine.currentStatus(jobId); - } - - @Override - public void cancel() { - try { - stateMachine.cancelingJob(jobId); - executor.remove(queueEntry); - queueEntry.interrupt(); - } catch (IllegalJobStateTransition e) { - LOG.info("Cannot cancel the job", e); - } - } -} diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java index c97ffd1ee6..363040aac5 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java @@ -17,21 +17,15 @@ package org.apache.ignite.internal.compute.queue; -import static java.util.concurrent.CompletableFuture.failedFuture; -import static org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR; - import java.util.Objects; import java.util.UUID; import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; -import org.apache.ignite.internal.compute.executor.QueueExecutionImpl; import org.apache.ignite.internal.compute.state.ComputeStateMachine; import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.lang.IgniteException; /** * Compute job executor with priority mechanism. @@ -71,50 +65,31 @@ public class PriorityQueueExecutor { /** * Submit job for execution. Job can be started immediately if queue is empty or will be added to queue with provided priority. * + * @param <R> Job result type. * @param job Execute job callable. * @param priority Job priority. - * @param <R> Job result type. + * @param maxRetries Number of retries of the execution after failure, {@code 0} means the execution will not be retried. * @return Completable future which will be finished when compute job finished. */ - public <R> QueueExecution<R> submit(Callable<R> job, int priority) { + public <R> QueueExecution<R> submit(Callable<R> job, int priority, int maxRetries) { Objects.requireNonNull(job); UUID jobId = stateMachine.initJob(); - QueueEntry<R> queueEntry = new QueueEntry<>(() -> { - stateMachine.executeJob(jobId); - return job.call(); - }, priority); - - return new QueueExecutionImpl<>(jobId, queueEntry, execute(queueEntry, jobId), executor, stateMachine); + QueueExecutionImpl<R> execution = new QueueExecutionImpl<>(jobId, job, priority, executor, stateMachine); + execution.run(maxRetries); + return execution; } /** - * Submit job for execution. Job can be started immediately if queue is empty or will be added to queue with default priority. + * Submit job for execution. Job can be started immediately if queue is empty or will be added to queue with default priority and no + * retries. * * @param job Execute job callable. * @param <R> Job result type. * @return Completable future which will be finished when compute job finished. */ public <R> QueueExecution<R> submit(Callable<R> job) { - return submit(job, 0); - } - - private <R> CompletableFuture<R> execute(QueueEntry<R> queueEntry, UUID jobId) { - try { - executor.execute(queueEntry); - } catch (QueueOverflowException e) { - return failedFuture(new IgniteException(QUEUE_OVERFLOW_ERR, e)); - } - return queueEntry.toFuture() - .whenComplete((r, throwable) -> { - if (queueEntry.isInterrupted()) { - stateMachine.cancelJob(jobId); - } else if (throwable != null) { - stateMachine.failJob(jobId); - } else { - stateMachine.completeJob(jobId); - } - }); + return submit(job, 0, 0); } /** diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java index cbce1e2461..89e0fafba6 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java @@ -33,7 +33,7 @@ import org.jetbrains.annotations.Nullable; * * @param <R> Compute job return type. */ -public class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> { +class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> { private static final AtomicLong seq = new AtomicLong(Long.MIN_VALUE); private final CompletableFuture<R> future = new CompletableFuture<>(); @@ -65,7 +65,6 @@ public class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> { @Override public void run() { - lock.lock(); try { workerThread = Thread.currentThread(); @@ -99,7 +98,7 @@ public class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> { /** * Sets interrupt status of the worker thread. */ - public void interrupt() { + void interrupt() { // Interrupt under the lock to prevent interrupting thread used by the pool for another task lock.lock(); try { @@ -117,7 +116,7 @@ public class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> { * * @return {@code true} when the execution was interrupted externally. */ - public boolean isInterrupted() { + boolean isInterrupted() { return isInterrupted; } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java new file mode 100644 index 0000000000..1158b4dcc6 --- /dev/null +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute.queue; + +import static org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR; + +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.compute.JobStatus; +import org.apache.ignite.internal.compute.state.ComputeStateMachine; +import org.apache.ignite.internal.compute.state.IllegalJobStateTransition; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.lang.IgniteException; +import org.jetbrains.annotations.Nullable; + +/** + * Queue execution object implementation. + * + * @param <R> Job result type. + */ +class QueueExecutionImpl<R> implements QueueExecution<R> { + private static final IgniteLogger LOG = Loggers.forClass(QueueExecutionImpl.class); + + private final UUID jobId; + private final Callable<R> job; + private final int priority; + private final ThreadPoolExecutor executor; + private final ComputeStateMachine stateMachine; + + private final CompletableFuture<R> result = new CompletableFuture<>(); + + private final AtomicReference<QueueEntry<R>> queueEntry = new AtomicReference<>(); + + /** + * Constructor. + * + * @param jobId Job id. + * @param job Execute job callable. + * @param priority Job priority. + * @param executor Executor on which the queue entry is running. + * @param stateMachine State machine. + */ + QueueExecutionImpl( + UUID jobId, + Callable<R> job, + int priority, + ThreadPoolExecutor executor, + ComputeStateMachine stateMachine + ) { + this.jobId = jobId; + this.job = job; + this.priority = priority; + this.executor = executor; + this.stateMachine = stateMachine; + } + + @Override + public CompletableFuture<R> resultAsync() { + return result; + } + + @Override + public @Nullable JobStatus status() { + return stateMachine.currentStatus(jobId); + } + + @Override + public void cancel() { + try { + stateMachine.cancelingJob(jobId); + + QueueEntry<R> queueEntry = this.queueEntry.get(); + if (queueEntry != null) { + executor.remove(queueEntry); + queueEntry.interrupt(); + } + } catch (IllegalJobStateTransition e) { + LOG.info("Cannot cancel the job", e); + } + } + + /** + * Runs the job, completing the result future and retrying the execution in case of failure at most {@code numRetries} times. + * + * @param numRetries Number of times to retry failed execution. + */ + void run(int numRetries) { + QueueEntry<R> queueEntry = new QueueEntry<>(() -> { + stateMachine.executeJob(jobId); + return job.call(); + }, priority); + + // Ignoring previous value since it can't be running because we are calling run either after the construction or after the failure. + this.queueEntry.set(queueEntry); + + try { + executor.execute(queueEntry); + } catch (QueueOverflowException e) { + result.completeExceptionally(new IgniteException(QUEUE_OVERFLOW_ERR, e)); + return; + } + + queueEntry.toFuture().whenComplete((r, throwable) -> { + if (throwable != null) { + if (numRetries > 0) { + stateMachine.queueJob(jobId); + run(numRetries - 1); + } else { + result.completeExceptionally(throwable); + if (queueEntry.isInterrupted()) { + stateMachine.cancelJob(jobId); + } else { + stateMachine.failJob(jobId); + } + } + } else { + result.complete(r); + if (queueEntry.isInterrupted()) { + stateMachine.cancelJob(jobId); + } else { + stateMachine.completeJob(jobId); + } + } + }); + } + +} diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java index 03db3eb17c..0aa2ae7045 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java @@ -83,6 +83,14 @@ public interface ComputeStateMachine { */ void failJob(UUID jobId); + /** + * Tries to transfer Compute Job to queued state from the {@link JobState#EXECUTING} state, used for retrying. + * + * @param jobId Compute job identifier. + * @throws IllegalJobStateTransition in case when job can't be transferred to failed state. + */ + void queueJob(UUID jobId); + /** * Returns current status of Compute Job. * diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java index 6e4d38af22..f35b84f4bb 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java @@ -120,6 +120,11 @@ public class InMemoryComputeStateMachine implements ComputeStateMachine { waitToRemove.add(jobId); } + @Override + public void queueJob(UUID jobId) { + changeJobState(jobId, QUEUED); + } + @Override public void completeJob(UUID jobId) { changeJobState(jobId, COMPLETED); @@ -199,7 +204,7 @@ public class InMemoryComputeStateMachine implements ComputeStateMachine { case QUEUED: return toState == EXECUTING || toState == CANCELING || toState == CANCELED; case EXECUTING: - return toState == FAILED || toState == COMPLETED || toState == CANCELING || toState == CANCELED; + return toState == FAILED || toState == COMPLETED || toState == CANCELING || toState == CANCELED || toState == QUEUED; case CANCELING: return toState == CANCELED || toState == FAILED || toState == COMPLETED; case FAILED: diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java index bbf3ba1d9a..0bf77532ca 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java @@ -18,11 +18,17 @@ package org.apache.ignite.internal.compute.executor; import static org.apache.ignite.compute.JobState.CANCELED; +import static org.apache.ignite.compute.JobState.COMPLETED; import static org.apache.ignite.compute.JobState.EXECUTING; +import static org.apache.ignite.compute.JobState.FAILED; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState; import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithStateAndCreateTimeStartTime; import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecution; @@ -114,4 +120,90 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest { } } + @Test + void retryJobFail() { + AtomicInteger runTimes = new AtomicInteger(); + + int maxRetries = 5; + + JobExecution<Integer> execution = computeExecutor.executeJob( + ExecutionOptions.builder().maxRetries(maxRetries).build(), + RetryJobFail.class, + new Object[]{runTimes} + ); + + await().until(execution::status, jobStatusWithState(FAILED)); + + assertThat(runTimes.get(), is(maxRetries + 1)); + } + + private static class RetryJobFail implements ComputeJob<Integer> { + + @Override + public Integer execute(JobExecutionContext context, Object... args) { + AtomicInteger runTimes = (AtomicInteger) args[0]; + runTimes.incrementAndGet(); + throw new RuntimeException(); + } + } + + @Test + void retryJobSuccess() { + AtomicInteger runTimes = new AtomicInteger(); + + int maxRetries = 5; + + JobExecution<Integer> execution = computeExecutor.executeJob( + ExecutionOptions.builder().maxRetries(maxRetries).build(), + RetryJobSuccess.class, + new Object[]{runTimes, maxRetries} + ); + + await().until(execution::status, jobStatusWithState(COMPLETED)); + + assertThat(runTimes.get(), is(maxRetries + 1)); + } + + private static class RetryJobSuccess implements ComputeJob<Integer> { + + @Override + public Integer execute(JobExecutionContext context, Object... args) { + AtomicInteger runTimes = (AtomicInteger) args[0]; + int maxRetries = (int) args[1]; + if (runTimes.incrementAndGet() <= maxRetries) { + throw new RuntimeException(); + } + return 0; + } + + } + + @Test + void runJobOnce() { + AtomicInteger runTimes = new AtomicInteger(); + + int maxRetries = 5; + + JobExecution<Integer> execution = computeExecutor.executeJob( + ExecutionOptions.builder().maxRetries(maxRetries).build(), + JobSuccess.class, + new Object[]{runTimes} + ); + + await().until(execution::status, jobStatusWithState(COMPLETED)); + + assertThat(execution.resultAsync(), willBe(1)); + assertThat(runTimes.get(), is(1)); + } + + private static class JobSuccess implements ComputeJob<Integer> { + + @Override + public Integer execute(JobExecutionContext context, Object... args) { + AtomicInteger runTimes = (AtomicInteger) args[0]; + return runTimes.incrementAndGet(); + } + + } + } diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java index 22b56e0061..9e18bc1758 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.compute.queue; import static org.apache.ignite.compute.JobState.CANCELED; import static org.apache.ignite.compute.JobState.COMPLETED; import static org.apache.ignite.compute.JobState.EXECUTING; +import static org.apache.ignite.compute.JobState.FAILED; import static org.apache.ignite.compute.JobState.QUEUED; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn; @@ -28,13 +29,13 @@ import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithStateAndCreateTimeStartTime; import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.compute.JobStatus; import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; import org.apache.ignite.internal.compute.state.InMemoryComputeStateMachine; @@ -233,10 +234,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { } }); - JobStatus executingStatus = await().until( - execution::status, - jobStatusWithState(equalTo(EXECUTING)) - ); + JobStatus executingStatus = await().until(execution::status, jobStatusWithState(EXECUTING)); execution.cancel(); @@ -256,10 +254,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { return 0; }); - JobStatus executingStatus = await().until( - execution::status, - jobStatusWithState(equalTo(EXECUTING)) - ); + JobStatus executingStatus = await().until(execution::status, jobStatusWithState(EXECUTING)); execution.cancel(); @@ -309,6 +304,60 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { assertThat(execution.resultAsync(), willTimeoutIn(100, TimeUnit.MILLISECONDS)); } + @Test + void retryTaskFail() { + initExecutor(1); + + AtomicInteger runTimes = new AtomicInteger(); + + int maxRetries = 5; + + QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> { + runTimes.incrementAndGet(); + throw new RuntimeException(); + }, 0, maxRetries); + + await().until(execution::status, jobStatusWithState(FAILED)); + + assertThat(runTimes.get(), is(maxRetries + 1)); + } + + @Test + void retryTaskSuccess() { + initExecutor(1); + + AtomicInteger runTimes = new AtomicInteger(); + + int maxRetries = 5; + + QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> { + if (runTimes.incrementAndGet() <= maxRetries) { + throw new RuntimeException(); + } + return 0; + }, 0, maxRetries); + + await().until(execution::status, jobStatusWithState(COMPLETED)); + + assertThat(runTimes.get(), is(maxRetries + 1)); + } + + @Test + void defaultTaskIsNotRetried() { + initExecutor(1); + + AtomicInteger runTimes = new AtomicInteger(); + + QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> { + runTimes.incrementAndGet(); + throw new RuntimeException(); + }); + + await().until(execution::status, jobStatusWithState(FAILED)); + + assertThat(runTimes.get(), is(1)); + } + private void initExecutor(int threads) { initExecutor(threads, Integer.MAX_VALUE); } @@ -327,10 +376,14 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { } private <R> CompletableFuture<R> submit(Callable<R> job) { - return priorityQueueExecutor.submit(job).resultAsync(); + return submit(job, 0); } private <R> CompletableFuture<R> submit(Callable<R> job, int priority) { - return priorityQueueExecutor.submit(job, priority).resultAsync(); + return submit(job, priority, 0); + } + + private <R> CompletableFuture<R> submit(Callable<R> job, int priority, int maxRetries) { + return priorityQueueExecutor.submit(job, priority, maxRetries).resultAsync(); } } diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java index 97ccd9d5ab..e514a2b398 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java @@ -22,6 +22,8 @@ import static org.apache.ignite.compute.JobState.CANCELING; import static org.apache.ignite.compute.JobState.COMPLETED; import static org.apache.ignite.compute.JobState.EXECUTING; import static org.apache.ignite.compute.JobState.FAILED; +import static org.apache.ignite.compute.JobState.QUEUED; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.AnythingMatcher.anything; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState; @@ -39,7 +41,6 @@ import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; -import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -72,6 +73,7 @@ public class InMemoryComputeStateMachineTest extends BaseIgniteAbstractTest { @Test public void testSubmit() { assertThat(jobId, is(notNullValue())); + assertThat(stateMachine.currentStatus(jobId), jobStatusWithState(QUEUED)); } @Test @@ -82,8 +84,7 @@ public class InMemoryComputeStateMachineTest extends BaseIgniteAbstractTest { @Test public void testCancel() { - stateMachine.cancelJob(jobId); - assertThat(stateMachine.currentStatus(jobId), jobStatusWithState(CANCELED)); + cancelJob(false); } @Test @@ -119,6 +120,12 @@ public class InMemoryComputeStateMachineTest extends BaseIgniteAbstractTest { completeJob(false); } + @Test + public void testQueue() { + executeJob(false); + queueJob(false); + } + @Test public void testDoubleExecution() { executeJob(false); @@ -141,6 +148,14 @@ public class InMemoryComputeStateMachineTest extends BaseIgniteAbstractTest { failJob(true); } + @Test + public void testDoubleQueue() { + executeJob(false); + + queueJob(false); + queueJob(true); + } + @Test public void testCleanStates() throws InterruptedException { assertThat(configuration.change(change -> change.changeStatesLifetimeMillis(100)), willCompleteSuccessfully()); @@ -151,22 +166,22 @@ public class InMemoryComputeStateMachineTest extends BaseIgniteAbstractTest { jobId = stateMachine.initJob(); executeJob(false); completeJob(false); - IgniteTestUtils.waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100); + waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100); jobId = stateMachine.initJob(); executeJob(false); failJob(false); - IgniteTestUtils.waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100); + waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100); jobId = stateMachine.initJob(); cancelJob(false); - IgniteTestUtils.waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100); + waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100); jobId = stateMachine.initJob(); executeJob(false); cancelingJob(false); cancelJob(false); - IgniteTestUtils.waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100); + waitForCondition(() -> stateMachine.currentStatus(jobId) == null, 100); stateMachine.stop(); } @@ -255,4 +270,21 @@ public class InMemoryComputeStateMachineTest extends BaseIgniteAbstractTest { assertThrows(IllegalJobStateTransition.class, () -> stateMachine.failJob(jobId)); } } + + private void queueJob(boolean shouldFail) { + if (!shouldFail) { + stateMachine.queueJob(jobId); + assertThat( + stateMachine.currentStatus(jobId), + jobStatusWithStateAndCreateTimeStartTimeFinishTime( + equalTo(QUEUED), + notNullValue(Instant.class), + notNullValue(Instant.class), + nullValue(Instant.class) + ) + ); + } else { + assertThrows(IllegalJobStateTransition.class, () -> stateMachine.queueJob(jobId)); + } + } } diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/JobStatusMatcher.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/JobStatusMatcher.java index 2cd563a03f..06799e1f6d 100644 --- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/JobStatusMatcher.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/JobStatusMatcher.java @@ -203,9 +203,13 @@ public class JobStatusMatcher extends BaseMatcher<JobStatus> { mismatchDescription.appendText("was not a JobStatus: ").appendValue(actual); } else { JobStatus status = (JobStatus) actual; + mismatchDescription.appendText("state "); stateMatcher.describeMismatch(status.state(), mismatchDescription); + mismatchDescription.appendText(", create time "); createTimeMatcher.describeMismatch(status.createTime(), mismatchDescription); + mismatchDescription.appendText(", start time "); startTimeMatcher.describeMismatch(status.startTime(), mismatchDescription); + mismatchDescription.appendText(" and finish time "); finishTimeMatcher.describeMismatch(status.finishTime(), mismatchDescription); } } @@ -218,7 +222,7 @@ public class JobStatusMatcher extends BaseMatcher<JobStatus> { .appendDescriptionOf(createTimeMatcher) .appendText(", start time ") .appendDescriptionOf(startTimeMatcher) - .appendText(", and finish time ") + .appendText(" and finish time ") .appendDescriptionOf(finishTimeMatcher); } }