[
https://issues.apache.org/jira/browse/FLINK-8797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380203#comment-16380203
]
ASF GitHub Bot commented on FLINK-8797:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5589#discussion_r171228120
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
---
@@ -223,6 +223,78 @@
}
}
+ /**
+ * Retry the given operation with the given delay in between successful
completions where the
+ * result does not match a given predicate.
+ *
+ * @param operation to retry
+ * @param retries number of retries
+ * @param retryDelay delay between retries
+ * @param retryPredicate Predicate to test whether the result is
acceptable
+ * @param scheduledExecutor executor to be used for the retry operation
+ * @param <T> type of the result
+ * @return Future which retries the given operation a given amount of
times and delays the retry
+ * in case the predicate isn't matched
+ */
+ public static <T> CompletableFuture<T> retrySuccesfulWithDelay(
+ final Supplier<CompletableFuture<T>> operation,
+ final long retries,
+ final Time retryDelay,
+ final Predicate<T> retryPredicate,
+ final ScheduledExecutor scheduledExecutor) {
+
+ final CompletableFuture<T> resultFuture = new
CompletableFuture<>();
+
+ retrySuccessfulOperationWithDelay(
+ resultFuture,
+ operation,
+ retries,
+ retryDelay,
+ retryPredicate,
+ scheduledExecutor);
+
+ return resultFuture;
+ }
+
+ private static <T> void retrySuccessfulOperationWithDelay(
+ final CompletableFuture<T> resultFuture,
+ final Supplier<CompletableFuture<T>> operation,
+ final long retries,
+ final Time retryDelay,
+ final Predicate<T> retryPredicate,
+ final ScheduledExecutor scheduledExecutor) {
+
+ if (!resultFuture.isDone()) {
+ final CompletableFuture<T> operationResultFuture =
operation.get();
+
+ operationResultFuture.whenComplete(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof
CancellationException) {
+
resultFuture.completeExceptionally(new RetryException("Operation future was
cancelled.", throwable));
+ } else {
+
resultFuture.completeExceptionally(throwable);
+ }
+ } else {
+ if (retries > 0 &&
!retryPredicate.test(t)) {
+ final
ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+ () ->
retrySuccessfulOperationWithDelay(resultFuture, operation, retries - 1,
retryDelay, retryPredicate, scheduledExecutor),
+
retryDelay.toMilliseconds(),
+
TimeUnit.MILLISECONDS);
+
+
resultFuture.whenComplete(
+ (innerT,
innerThrowable) -> scheduledFuture.cancel(false));
+ } else {
+
resultFuture.complete(t);
--- End diff --
This seems rather unfortunate. In the code sample below I wait for a job to
be RUNNING. I can only replace the loop, but not the final check which really
limits the userfulness:
```
JobStatus jobStatus =
client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(),
TimeUnit.MILLISECONDS);
while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) {
Thread.sleep(50);
jobStatus =
client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(),
TimeUnit.MILLISECONDS);
}
if (jobStatus != JobStatus.RUNNING) {
Assert.fail("Job not in state RUNNING.");
}
================== Comparison ===============================
CompletableFuture<JobStatus> jobStatusFuture =
FutureUtils.retrySuccesfulWithDelay(
() -> client.getJobStatus(jobSubmissionResult.getJobID()),
submissionDeadLine.timeLeft().toMillis() / 50, // crappy retry count
calculation
Time.milliseconds(50),
status -> status == JobStatus.RUNNING,
TestingUtils.defaultScheduledExecutor()
);
if (jobStatusFuture.get(submissionDeadLine.timeLeft().toMillis(),
TimeUnit.MILLISECONDS) != JobStatus.RUNNING) {
Assert.fail("Job not in state RUNNING.");
}
```
In it's current form this doesn't really provide value imo. (yes it's
asynchronous, but in which tests do really even we need that?)
> Port AbstractOperatorRestoreTestBase to MiniClusterResource
> -----------------------------------------------------------
>
> Key: FLINK-8797
> URL: https://issues.apache.org/jira/browse/FLINK-8797
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)