zentol commented on a change in pull request #19102:
URL: https://github.com/apache/flink/pull/19102#discussion_r827129983



##########
File path: docs/content.zh/docs/deployment/overview.md
##########
@@ -158,6 +158,11 @@ Once a job has reached a globally terminal state of either 
finished, failed or c
 external component resources associated with the job are then cleaned up. In 
the event of a
 failure when cleaning up a resource, Flink will attempt to retry the cleanup. 
You can
 [configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry 
strategy used.
+Reaching the maximum number of retries without succeeding will leave the job 
in a dirty state.
+Its artifacts (and its [JobResultStore]({{< ref 
"docs/deployment/ha/overview#jobresultstore" >}})
+entry like in Application Mode) would need to be cleaned up manually. 
Restarting the very same

Review comment:
       ```suggestion
   entry) would need to be cleaned up manually. Restarting the very same
   ```
   Reads a bit weird, and I'm not sure why application mode should be 
explicitly mentioned.

##########
File path: docs/content/docs/deployment/overview.md
##########
@@ -158,7 +158,12 @@ When deploying Flink, there are often multiple options 
available for each buildi
 Once a job has reached a globally terminal state of either finished, failed or 
cancelled, the
 external component resources associated with the job are then cleaned up. In 
the event of a
 failure when cleaning up a resource, Flink will attempt to retry the cleanup. 
You can
-[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry 
strategy used.
+[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry 
strategy used. 
+Reaching the maximum number of retries without succeeding will leave the job 
in a dirty state. 

Review comment:
       Do we have any documentation that explains the consequences of being in 
a dirty state?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -615,7 +615,16 @@ private void runJob(JobManagerRunner jobManagerRunner, 
ExecutionType executionTy
 
         final CompletableFuture<Void> jobTerminationFuture =
                 cleanupJobStateFuture.thenCompose(
-                        cleanupJobState -> removeJob(jobId, cleanupJobState));
+                        cleanupJobState ->
+                                removeJob(jobId, cleanupJobState)
+                                        .exceptionally(
+                                                throwable -> {
+                                                    log.warn(
+                                                            "The cleanup of 
job {} failed. The job's artifacts and its JobResultStore entry needs to be 
cleaned manually.",
+                                                            jobId,

Review comment:
       would it be possible to list the paths that need to be cleaned up?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -288,7 +304,7 @@ public void testCleanupAfterLeadershipChange() throws 
Exception {
         dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
         waitForJobToFinish(leaderElectionService, dispatcherGateway, jobId);
-        jobGraphRemovalErrorReceived.await();
+        firstCleanupFailsLatch.trigger();

Review comment:
       I don't see where we guarantee that the cleanup isn't re-run immediately 
and finishes before the leadership has changed.

##########
File path: docs/content/docs/deployment/overview.md
##########
@@ -158,7 +158,12 @@ When deploying Flink, there are often multiple options 
available for each buildi
 Once a job has reached a globally terminal state of either finished, failed or 
cancelled, the
 external component resources associated with the job are then cleaned up. In 
the event of a
 failure when cleaning up a resource, Flink will attempt to retry the cleanup. 
You can
-[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry 
strategy used.
+[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry 
strategy used. 
+Reaching the maximum number of retries without succeeding will leave the job 
in a dirty state. 
+Its artifacts (and its [JobResultStore]({{< ref 
"docs/deployment/ha/overview#jobresultstore" >}}) 
+entry like in Application Mode) would need to be cleaned up manually. 
Restarting the very same 
+job (i.e. using the same job ID) would result in the retryable cleanup being 
picked up again 

Review comment:
       ```suggestion
   job (i.e. using the same job ID) will result in the cleanup being restarted 
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -249,11 +259,30 @@ public void testCleanupAfterLeadershipChange() throws 
Exception {
 
         // Construct job graph store.
         final AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
+        final OneShotLatch firstCleanupFailsLatch = new OneShotLatch();
         final OneShotLatch successfulCleanupLatch = new OneShotLatch();
-        final RuntimeException temporaryError = new RuntimeException("Unable 
to remove job graph.");
         final JobGraphStore jobGraphStore =
-                createAndStartJobGraphStoreWithCleanupFailures(
-                        1, temporaryError, actualGlobalCleanupCallCount, 
successfulCleanupLatch);
+                TestingJobGraphStore.newBuilder()
+                        .setGlobalCleanupFunction(
+                                (ignoredJobId, ignoredExecutor) -> {
+                                    try {
+                                        firstCleanupFailsLatch.await();
+                                    } catch (InterruptedException e) {
+                                        throw new CompletionException(e);
+                                    }
+
+                                    if 
(actualGlobalCleanupCallCount.getAndIncrement() < 1) {
+                                        return 
FutureUtils.completedExceptionally(
+                                                new RuntimeException(
+                                                        "Expected 
RuntimeException: Unable to remove job graph."));
+                                    }
+
+                                    successfulCleanupLatch.trigger();
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+
+        jobGraphStore.start(null);

Review comment:
       see above

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -133,12 +132,23 @@ public void testCleanupThroughRetries() throws Exception {
         final int numberOfErrors = 5;
         final RuntimeException temporaryError =
                 new RuntimeException("Expected RuntimeException: Unable to 
remove job graph.");
+        final AtomicInteger failureCount = new AtomicInteger(numberOfErrors);
         final JobGraphStore jobGraphStore =
-                createAndStartJobGraphStoreWithCleanupFailures(
-                        numberOfErrors,
-                        temporaryError,
-                        actualGlobalCleanupCallCount,
-                        successfulCleanupLatch);
+                TestingJobGraphStore.newBuilder()
+                        .setGlobalCleanupFunction(
+                                (ignoredJobId, ignoredExecutor) -> {
+                                    
actualGlobalCleanupCallCount.incrementAndGet();
+
+                                    if (failureCount.getAndDecrement() > 0) {
+                                        return 
FutureUtils.completedExceptionally(temporaryError);
+                                    }
+
+                                    successfulCleanupLatch.trigger();
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+
+        jobGraphStore.start(null);

Review comment:
       ```suggestion
           jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to