This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c5352fc55972420ed5bf1afdfd97834540b1407a Author: Matthias Pohl <[email protected]> AuthorDate: Fri Mar 4 21:51:30 2022 +0100 [FLINK-26494][runtime] Adds log message to cleanup failure --- .../dispatcher/cleanup/DefaultResourceCleaner.java | 102 +++++++++++++++++---- .../cleanup/DispatcherResourceCleanerFactory.java | 27 ++++-- .../cleanup/DefaultResourceCleanerTest.java | 41 +++++---- 3 files changed, 125 insertions(+), 45 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java index 6226f74..77c40d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java @@ -24,6 +24,9 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.RetryStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -37,12 +40,14 @@ import java.util.stream.Collectors; */ public class DefaultResourceCleaner<T> implements ResourceCleaner { + private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceCleaner.class); + private final ComponentMainThreadExecutor mainThreadExecutor; private final Executor cleanupExecutor; private final CleanupFn<T> cleanupFn; - private final Collection<T> prioritizedCleanup; - private final Collection<T> regularCleanup; + private final Collection<CleanupWithLabel<T>> prioritizedCleanup; + private final Collection<CleanupWithLabel<T>> regularCleanup; private final RetryStrategy retryStrategy; @@ -97,8 +102,8 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner { private final RetryStrategy retryStrategy; - private final Collection<T> prioritizedCleanup = new ArrayList<>(); - private final Collection<T> regularCleanup = new ArrayList<>(); + private final Collection<CleanupWithLabel<T>> prioritizedCleanup = new ArrayList<>(); + private final Collection<CleanupWithLabel<T>> regularCleanup = new ArrayList<>(); private Builder( ComponentMainThreadExecutor mainThreadExecutor, @@ -117,10 +122,13 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner { * resources are added matters, i.e. if two cleanable resources are added as prioritized * cleanup tasks, the resource being added first will block the cleanup of the second * resource. All prioritized cleanup resources will run and finish before any resource that - * is added using {@link #withRegularCleanup(Object)} is started. + * is added using {@link #withRegularCleanup(String, Object)} is started. + * + * @param label The label being used when logging errors in the given cleanup. + * @param prioritizedCleanup The cleanup callback that is going to be prioritized. */ - public Builder<T> withPrioritizedCleanup(T prioritizedCleanup) { - this.prioritizedCleanup.add(prioritizedCleanup); + public Builder<T> withPrioritizedCleanup(String label, T prioritizedCleanup) { + this.prioritizedCleanup.add(new CleanupWithLabel<>(prioritizedCleanup, label)); return this; } @@ -128,10 +136,13 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner { * Regular cleanups are resources for which the cleanup is triggered after all prioritized * cleanups succeeded. All added regular cleanups will run concurrently to each other. * - * @see #withPrioritizedCleanup(Object) + * @param label The label being used when logging errors in the given cleanup. + * @param regularCleanup The cleanup callback that is going to run after all prioritized + * cleanups are finished. + * @see #withPrioritizedCleanup(String, Object) */ - public Builder<T> withRegularCleanup(T regularCleanup) { - this.regularCleanup.add(regularCleanup); + public Builder<T> withRegularCleanup(String label, T regularCleanup) { + this.regularCleanup.add(new CleanupWithLabel<>(regularCleanup, label)); return this; } @@ -150,8 +161,8 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner { ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor, CleanupFn<T> cleanupFn, - Collection<T> prioritizedCleanup, - Collection<T> regularCleanup, + Collection<CleanupWithLabel<T>> prioritizedCleanup, + Collection<CleanupWithLabel<T>> regularCleanup, RetryStrategy retryStrategy) { this.mainThreadExecutor = mainThreadExecutor; this.cleanupExecutor = cleanupExecutor; @@ -166,22 +177,79 @@ public class DefaultResourceCleaner<T> implements ResourceCleaner { mainThreadExecutor.assertRunningInMainThread(); CompletableFuture<Void> cleanupFuture = FutureUtils.completedVoidFuture(); - for (T cleanup : prioritizedCleanup) { - cleanupFuture = cleanupFuture.thenCompose(ignoredValue -> withRetry(jobId, cleanup)); + for (CleanupWithLabel<T> cleanupWithLabel : prioritizedCleanup) { + cleanupFuture = + cleanupFuture.thenCompose( + ignoredValue -> + withRetry( + jobId, + cleanupWithLabel.getLabel(), + cleanupWithLabel.getCleanup())); } return cleanupFuture.thenCompose( ignoredValue -> FutureUtils.completeAll( regularCleanup.stream() - .map(cleanup -> withRetry(jobId, cleanup)) + .map( + cleanupWithLabel -> + withRetry( + jobId, + cleanupWithLabel.getLabel(), + cleanupWithLabel.getCleanup())) .collect(Collectors.toList()))); } - private CompletableFuture<Void> withRetry(JobID jobId, T cleanup) { + private CompletableFuture<Void> withRetry(JobID jobId, String label, T cleanup) { return FutureUtils.retryWithDelay( - () -> cleanupFn.cleanupAsync(cleanup, jobId, cleanupExecutor), + () -> + cleanupFn + .cleanupAsync(cleanup, jobId, cleanupExecutor) + .whenComplete( + (value, throwable) -> { + if (throwable != null) { + final String logMessage = + String.format( + "Cleanup of %s failed for job %s due to a %s: %s", + label, + jobId, + throwable + .getClass() + .getSimpleName(), + throwable.getMessage()); + if (LOG.isTraceEnabled()) { + LOG.warn(logMessage, throwable); + } else { + LOG.warn(logMessage); + } + } + }), retryStrategy, mainThreadExecutor); } + + /** + * {@code CleanupWithLabel} makes it possible to attach a label to a given cleanup that can be + * used as human-readable representation of the corresponding cleanup. + * + * @param <CLEANUP_TYPE> The type of cleanup. + */ + private static class CleanupWithLabel<CLEANUP_TYPE> { + + private final CLEANUP_TYPE cleanup; + private final String label; + + public CleanupWithLabel(CLEANUP_TYPE cleanup, String label) { + this.cleanup = cleanup; + this.label = label; + } + + public CLEANUP_TYPE getCleanup() { + return cleanup; + } + + public String getLabel() { + return label; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java index 3fd6dc4..f14d7aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java @@ -43,6 +43,12 @@ import java.util.concurrent.Executor; */ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory { + private static final String JOB_MANAGER_RUNNER_REGISTRY_LABEL = "JobManagerRunnerRegistry"; + private static final String JOB_GRAPH_STORE_LABEL = "JobGraphStore"; + private static final String BLOB_SERVER_LABEL = "BlobServer"; + private static final String HA_SERVICES_LABEL = "HighAvailabilityServices"; + private static final String JOB_MANAGER_METRIC_GROUP_LABEL = "JobManagerMetricGroup"; + private final Executor cleanupExecutor; private final RetryStrategy retryStrategy; @@ -89,10 +95,10 @@ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory ComponentMainThreadExecutor mainThreadExecutor) { return DefaultResourceCleaner.forLocallyCleanableResources( mainThreadExecutor, cleanupExecutor, retryStrategy) - .withPrioritizedCleanup(jobManagerRunnerRegistry) - .withRegularCleanup(jobGraphWriter) - .withRegularCleanup(blobServer) - .withRegularCleanup(jobManagerMetricGroup) + .withPrioritizedCleanup(JOB_MANAGER_RUNNER_REGISTRY_LABEL, jobManagerRunnerRegistry) + .withRegularCleanup(JOB_GRAPH_STORE_LABEL, jobGraphWriter) + .withRegularCleanup(BLOB_SERVER_LABEL, blobServer) + .withRegularCleanup(JOB_MANAGER_METRIC_GROUP_LABEL, jobManagerMetricGroup) .build(); } @@ -101,11 +107,14 @@ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory ComponentMainThreadExecutor mainThreadExecutor) { return DefaultResourceCleaner.forGloballyCleanableResources( mainThreadExecutor, cleanupExecutor, retryStrategy) - .withPrioritizedCleanup(ofLocalResource(jobManagerRunnerRegistry)) - .withRegularCleanup(jobGraphWriter) - .withRegularCleanup(blobServer) - .withRegularCleanup(highAvailabilityServices) - .withRegularCleanup(ofLocalResource(jobManagerMetricGroup)) + .withPrioritizedCleanup( + JOB_MANAGER_RUNNER_REGISTRY_LABEL, + ofLocalResource(jobManagerRunnerRegistry)) + .withRegularCleanup(JOB_GRAPH_STORE_LABEL, jobGraphWriter) + .withRegularCleanup(BLOB_SERVER_LABEL, blobServer) + .withRegularCleanup(HA_SERVICES_LABEL, highAvailabilityServices) + .withRegularCleanup( + JOB_MANAGER_METRIC_GROUP_LABEL, ofLocalResource(jobManagerMetricGroup)) .build(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java index df16df7..39d332d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java @@ -34,6 +34,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; @@ -61,8 +62,8 @@ public class DefaultResourceCleanerTest { final CompletableFuture<Void> cleanupResult = createTestInstanceBuilder() - .withRegularCleanup(cleanup0) - .withRegularCleanup(cleanup1) + .withRegularCleanup("Reg #0", cleanup0) + .withRegularCleanup("Reg #1", cleanup1) .build() .cleanupAsync(JOB_ID); @@ -84,8 +85,8 @@ public class DefaultResourceCleanerTest { final CompletableFuture<Void> cleanupResult = createTestInstanceBuilder() - .withRegularCleanup(cleanup0) - .withRegularCleanup(cleanup1) + .withRegularCleanup("Reg #0", cleanup0) + .withRegularCleanup("Reg #1", cleanup1) .build() .cleanupAsync(JOB_ID); @@ -105,6 +106,7 @@ public class DefaultResourceCleanerTest { .hasExactlyElementsOfTypes( ExecutionException.class, FutureUtils.RetryException.class, + CompletionException.class, expectedException.getClass()) .last() .isEqualTo(expectedException); @@ -117,8 +119,8 @@ public class DefaultResourceCleanerTest { final CompletableFuture<Void> cleanupResult = createTestInstanceBuilder() - .withRegularCleanup(cleanup0) - .withRegularCleanup(cleanup1) + .withRegularCleanup("Reg #0", cleanup0) + .withRegularCleanup("Reg #1", cleanup1) .build() .cleanupAsync(JOB_ID); @@ -138,6 +140,7 @@ public class DefaultResourceCleanerTest { .hasExactlyElementsOfTypes( ExecutionException.class, FutureUtils.RetryException.class, + CompletionException.class, expectedException.getClass()) .last() .isEqualTo(expectedException); @@ -154,10 +157,10 @@ public class DefaultResourceCleanerTest { final DefaultResourceCleaner<CleanupCallback> testInstance = createTestInstanceBuilder() - .withPrioritizedCleanup(highPriorityCleanup) - .withPrioritizedCleanup(lowerThanHighPriorityCleanup) - .withRegularCleanup(noPriorityCleanup0) - .withRegularCleanup(noPriorityCleanup1) + .withPrioritizedCleanup("Prio #0", highPriorityCleanup) + .withPrioritizedCleanup("Prio #1", lowerThanHighPriorityCleanup) + .withRegularCleanup("Reg #0", noPriorityCleanup0) + .withRegularCleanup("Reg #1", noPriorityCleanup1) .build(); final CompletableFuture<Void> overallCleanupResult = testInstance.cleanupAsync(JOB_ID); @@ -189,10 +192,10 @@ public class DefaultResourceCleanerTest { final DefaultResourceCleaner<CleanupCallback> testInstance = createTestInstanceBuilder() - .withPrioritizedCleanup(highPriorityCleanup) - .withPrioritizedCleanup(lowerThanHighPriorityCleanup) - .withRegularCleanup(noPriorityCleanup0) - .withRegularCleanup(noPriorityCleanup1) + .withPrioritizedCleanup("Prio #0", highPriorityCleanup) + .withPrioritizedCleanup("Prio #1", lowerThanHighPriorityCleanup) + .withRegularCleanup("Reg #0", noPriorityCleanup0) + .withRegularCleanup("Reg #1", noPriorityCleanup1) .build(); assertThat(highPriorityCleanup.isDone()).isFalse(); @@ -224,8 +227,8 @@ public class DefaultResourceCleanerTest { final CompletableFuture<Void> compositeCleanupResult = createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(2)) - .withRegularCleanup(cleanupWithRetries) - .withRegularCleanup(oneRunCleanup) + .withRegularCleanup("Reg #0", cleanupWithRetries) + .withRegularCleanup("Reg #1", oneRunCleanup) .build() .cleanupAsync(JOB_ID); @@ -246,9 +249,9 @@ public class DefaultResourceCleanerTest { final CompletableFuture<Void> compositeCleanupResult = createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(1)) - .withPrioritizedCleanup(cleanupWithRetry) - .withPrioritizedCleanup(oneRunHigherPriorityCleanup) - .withRegularCleanup(oneRunCleanup) + .withPrioritizedCleanup("Prio #0", cleanupWithRetry) + .withPrioritizedCleanup("Prio #1", oneRunHigherPriorityCleanup) + .withRegularCleanup("Reg #0", oneRunCleanup) .build() .cleanupAsync(JOB_ID);
