dmvk commented on a change in pull request #18637:
URL: https://github.com/apache/flink/pull/18637#discussion_r800843432
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
##########
@@ -19,50 +19,52 @@
package org.apache.flink.runtime.dispatcher.cleanup;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.FlinkAssertions;
import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.util.concurrent.FixedRetryStrategy;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.RetryStrategy;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
+import static org.apache.flink.core.testutils.FlinkAssertions.STREAM_THROWABLE;
import static org.assertj.core.api.Assertions.assertThat;
/** {@code DefaultResourceCleanerTest} tests {@link DefaultResourceCleaner}. */
public class DefaultResourceCleanerTest {
+ private static final Duration TIMEOUT = Duration.ofMillis(100);
Review comment:
If we really need timeout, it should be fairly high, eg. 1 hour
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
##########
@@ -21,42 +21,64 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.util.concurrent.ExponentialBackoffRetryStrategy;
import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.RetryStrategy;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
-/** {@code DefaultResourceCleaner} is the default implementation of {@link
ResourceCleaner}. */
+/**
+ * {@code DefaultResourceCleaner} is the default implementation of {@link
ResourceCleaner}. It will
+ * try to clean up any resource that was added. Failure will result in an
individual retry of the
+ * cleanup. The overall cleanup result succeeds after all subtasks succeeded.
+ *
+ * <p>The retry strategy is defined in {@link #DEFAULT_RETRY_STRATEGY}.
+ */
public class DefaultResourceCleaner<T> implements ResourceCleaner {
+ private static final RetryStrategy DEFAULT_RETRY_STRATEGY =
Review comment:
Both min and max delay should be configurable.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
##########
@@ -181,33 +212,97 @@ public void
testMediumPriorityCleanupBlocksAllLowerPrioritizedCleanups() {
assertThat(noPriorityCleanup1.isDone()).isTrue();
}
+ @Test
+ public void testCleanupWithRetries() {
+ final Collection<JobID> actualJobIds = new ArrayList<>();
+ final CleanupCallback cleanupWithRetries =
cleanupWithRetry(actualJobIds, 2);
+ final SingleCallCleanup oneRunCleanup =
SingleCallCleanup.withCompletionOnCleanup();
+
+ final CompletableFuture<Void> compositeCleanupResult =
+ createTestInstanceBuilder(new FixedRetryStrategy(2,
Duration.ZERO))
+ .withRegularCleanup(cleanupWithRetries)
+ .withRegularCleanup(oneRunCleanup)
+ .build()
+ .cleanupAsync(JOB_ID);
+
+
assertThat(compositeCleanupResult).succeedsWithin(Duration.ofMillis(100));
+
+ assertThat(oneRunCleanup.getProcessedJobId()).isEqualTo(JOB_ID);
+ assertThat(oneRunCleanup.isDone()).isTrue();
+ assertThat(actualJobIds).containsExactly(JOB_ID, JOB_ID, JOB_ID);
+ }
+
+ @Test
+ public void testCleanupWithSingleRetryInHighPriorityTask() {
+ final Collection<JobID> actualJobIds = new ArrayList<>();
+ final CleanupCallback cleanupWithRetry =
cleanupWithRetry(actualJobIds, 1);
+ final SingleCallCleanup oneRunCleanup =
SingleCallCleanup.withCompletionOnCleanup();
+
+ final CompletableFuture<Void> compositeCleanupResult =
+ createTestInstanceBuilder(new FixedRetryStrategy(1,
Duration.ZERO))
+ .withPrioritizedCleanup(cleanupWithRetry)
+ .withRegularCleanup(oneRunCleanup)
+ .build()
+ .cleanupAsync(JOB_ID);
+
+
assertThat(compositeCleanupResult).succeedsWithin(Duration.ofMillis(100));
+
+ assertThat(oneRunCleanup.getProcessedJobId()).isEqualTo(JOB_ID);
+ assertThat(oneRunCleanup.isDone()).isTrue();
+ assertThat(actualJobIds).containsExactly(JOB_ID, JOB_ID);
+ }
+
private static DefaultResourceCleaner.Builder<CleanupCallback>
createTestInstanceBuilder() {
+ return createTestInstanceBuilder(new FixedRetryStrategy(0,
Duration.ZERO));
+ }
+
+ private static DefaultResourceCleaner.Builder<CleanupCallback>
createTestInstanceBuilder(
+ RetryStrategy retryStrategy) {
return DefaultResourceCleaner.forCleanableResources(
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR,
- CleanupCallback::cleanup);
+ CleanupCallback::apply,
+ retryStrategy);
+ }
+
+ private static CleanupCallback cleanupWithRetry(
+ Collection<JobID> actualJobIds, int numberOfFailureRuns) {
+ final AtomicInteger failureRunCount = new
AtomicInteger(numberOfFailureRuns);
+ return (actualJobId, executor) -> {
+ actualJobIds.add(actualJobId);
+ if (failureRunCount.getAndDecrement() > 0) {
+ return FutureUtils.completedExceptionally(
+ new RuntimeException("Expected RuntimeException"));
+ }
+
+ return FutureUtils.completedVoidFuture();
+ };
+ }
+
+ private interface CleanupCallback extends BiFunction<JobID, Executor,
CompletableFuture<Void>> {
+ // empty interface to remove necessity use generics all the time
}
- private static class CleanupCallback {
+ private static class SingleCallCleanup implements CleanupCallback {
private final CompletableFuture<Void> resultFuture = new
CompletableFuture<>();
private JobID jobId;
private final Consumer<CompletableFuture<Void>> internalFunction;
- public static CleanupCallback withCompletionOnCleanup() {
- return new CleanupCallback(resultFuture ->
resultFuture.complete(null));
+ public static SingleCallCleanup withCompletionOnCleanup() {
+ return new SingleCallCleanup(resultFuture ->
resultFuture.complete(null));
}
- public static CleanupCallback withoutCompletionOnCleanup() {
- return new CleanupCallback(ignoredResultFuture -> {});
+ public static SingleCallCleanup withoutCompletionOnCleanup() {
+ return new SingleCallCleanup(ignoredResultFuture -> {});
}
- private CleanupCallback(Consumer<CompletableFuture<Void>>
internalFunction) {
+ private SingleCallCleanup(Consumer<CompletableFuture<Void>>
internalFunction) {
this.internalFunction = internalFunction;
}
- public CompletableFuture<Void> cleanup(JobID jobId, Executor executor)
{
+ public CompletableFuture<Void> apply(JobID jobId, Executor executor) {
Preconditions.checkState(this.jobId == null);
this.jobId = jobId;
Review comment:
This should be always executed in the main thread
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
##########
@@ -181,33 +212,97 @@ public void
testMediumPriorityCleanupBlocksAllLowerPrioritizedCleanups() {
assertThat(noPriorityCleanup1.isDone()).isTrue();
}
+ @Test
+ public void testCleanupWithRetries() {
+ final Collection<JobID> actualJobIds = new ArrayList<>();
+ final CleanupCallback cleanupWithRetries =
cleanupWithRetry(actualJobIds, 2);
+ final SingleCallCleanup oneRunCleanup =
SingleCallCleanup.withCompletionOnCleanup();
+
+ final CompletableFuture<Void> compositeCleanupResult =
+ createTestInstanceBuilder(new FixedRetryStrategy(2,
Duration.ZERO))
+ .withRegularCleanup(cleanupWithRetries)
+ .withRegularCleanup(oneRunCleanup)
+ .build()
+ .cleanupAsync(JOB_ID);
+
+
assertThat(compositeCleanupResult).succeedsWithin(Duration.ofMillis(100));
+
+ assertThat(oneRunCleanup.getProcessedJobId()).isEqualTo(JOB_ID);
+ assertThat(oneRunCleanup.isDone()).isTrue();
+ assertThat(actualJobIds).containsExactly(JOB_ID, JOB_ID, JOB_ID);
+ }
+
+ @Test
+ public void testCleanupWithSingleRetryInHighPriorityTask() {
+ final Collection<JobID> actualJobIds = new ArrayList<>();
+ final CleanupCallback cleanupWithRetry =
cleanupWithRetry(actualJobIds, 1);
+ final SingleCallCleanup oneRunCleanup =
SingleCallCleanup.withCompletionOnCleanup();
+
+ final CompletableFuture<Void> compositeCleanupResult =
+ createTestInstanceBuilder(new FixedRetryStrategy(1,
Duration.ZERO))
+ .withPrioritizedCleanup(cleanupWithRetry)
Review comment:
do we still need prioritized cleanups?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]