zentol commented on a change in pull request #16881:
URL: https://github.com/apache/flink/pull/16881#discussion_r695491782



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
##########
@@ -372,16 +374,55 @@ public void close() {
 
         final CompletableFuture<JobSubmitResponseBody> submissionFuture =
                 requestFuture.thenCompose(
-                        requestAndFileUploads ->
-                                sendRetriableRequest(
-                                        JobSubmitHeaders.getInstance(),
-                                        EmptyMessageParameters.getInstance(),
-                                        requestAndFileUploads.f0,
-                                        requestAndFileUploads.f1,
-                                        
isConnectionProblemOrServiceUnavailable()));
+                        requestAndFileUploads -> {
+                            LOG.info(
+                                    "Submitting job '{}' ({}).",
+                                    jobGraph.getName(),
+                                    jobGraph.getJobID());
+                            final int retryMaxAttempts =
+                                    
restClusterClientConfiguration.getRetryMaxAttempts();
+                            final long retryDelay = 
restClusterClientConfiguration.getRetryDelay();
+                            final AtomicInteger failedAttempts = new 
AtomicInteger(0);
+                            return sendRetriableRequest(
+                                    JobSubmitHeaders.getInstance(),
+                                    EmptyMessageParameters.getInstance(),
+                                    requestAndFileUploads.f0,
+                                    requestAndFileUploads.f1,
+                                    isConnectionProblemOrServiceUnavailable(),
+                                    (receiver, error) -> {
+                                        if (error != null) {
+                                            final int performedRetries =
+                                                    
failedAttempts.getAndIncrement();
+                                            if (performedRetries < 
retryMaxAttempts) {
+                                                LOG.warn(
+                                                        "Unable to submit job 
'{}' ({}) to '{}'. Scheduling retry [attempt={}, maxAttempts={}, delay={}ms].",

Review comment:
       The main issue is that we are now making assumptions here on how 
`sendRetriableRequest` work. For example we assume that it uses a fixed-delay 
retry strategy, and are effectively counting restart attempts at 2 places. 
That's a bit eh.
   
   Ideally the `FutureUtils` would provide us with some hook to log stuff, but 
that would quickly get out of hand because we then _should_ touch all retry* 
variants.
   
   So I would suggest to forget what I said earlier about all the retry stuff.
   
   If the operation fails, whether it is just a retry or completely, we just 
log a warning saying that `"Attempt to submit job [...] failed."`. This 
phrasing works for both cases.
   Whether the failure to submit is really an _error_ the user can decide based 
on how they handle the returned future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to