SteNicholas commented on code in PR #237:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/237#discussion_r881766813


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -618,32 +623,51 @@ public SavepointFetchResult fetchSavepointInfo(
             
savepointStatusMessageParameters.jobIdPathParameter.resolve(JobID.fromHexString(jobId));
             savepointStatusMessageParameters.triggerIdPathParameter.resolve(
                     TriggerId.fromHexString(triggerId));
-            CompletableFuture<AsynchronousOperationResult<SavepointInfo>> 
response =
-                    clusterClient.sendRequest(
-                            savepointStatusHeaders,
-                            savepointStatusMessageParameters,
-                            EmptyRequestBody.getInstance());
-
-            if (response.get() == null || response.get().resource() == null) {
-                return SavepointFetchResult.pending();
-            }
-
-            if (response.get().resource().getLocation() == null) {
-                if (response.get().resource().getFailureCause() != null) {
-                    LOG.error(
-                            "Failure occurred while fetching the savepoint 
result",
-                            response.get().resource().getFailureCause());
-                    return SavepointFetchResult.error(
-                            
response.get().resource().getFailureCause().toString());
-                } else {
+            while (true) {
+                CompletableFuture<AsynchronousOperationResult<SavepointInfo>> 
response =
+                        clusterClient.sendRequest(
+                                savepointStatusHeaders,
+                                savepointStatusMessageParameters,
+                                EmptyRequestBody.getInstance());
+
+                if (response.get() == null || response.get().resource() == 
null) {
                     return SavepointFetchResult.pending();
                 }
+
+                if (response.get().resource().getLocation() == null) {
+                    // If the failure cause contains 'Not all required tasks 
are currently
+                    // running.', then continue until savepoint is 
successfully fetched.
+                    if (response.get().resource().getFailureCause() != null) {

Review Comment:
   @morhidi, the main idea is that if the job status is running but the error 
`Aborting checkpoint. Failure reason: Not all required tasks are currently 
running.`, we could retry to trigger checkpoint until it's successfully 
triggered. IMO,  this error could happen when certain tasks are initialized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to