This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new e5fb97e305d [FLINK-32226][client] Cleanup jobgraph file on submission 
failure
e5fb97e305d is described below

commit e5fb97e305d6f23bdd0fae6484cb2f6c5c2dcd1d
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed May 31 13:36:14 2023 +0200

    [FLINK-32226][client] Cleanup jobgraph file on submission failure
---
 .../client/program/rest/RestClusterClient.java     | 35 +++++++++++++++++++---
 .../client/program/rest/RestClusterClientTest.java | 29 +++++++++++++++++-
 2 files changed, 59 insertions(+), 5 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a05350d5e58..2ebcc14d5fe 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -158,6 +158,7 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(RestClusterClient.class);
 
     private final RestClusterClientConfiguration 
restClusterClientConfiguration;
+    private final java.nio.file.Path tempDir;
 
     private final Configuration configuration;
 
@@ -194,7 +195,13 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
     public RestClusterClient(
             Configuration config, T clusterId, 
ClientHighAvailabilityServicesFactory factory)
             throws Exception {
-        this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), 
factory);
+        this(
+                config,
+                null,
+                clusterId,
+                new ExponentialWaitStrategy(10L, 2000L),
+                factory,
+                Files.createTempDirectory("flink-rest-client-jobgraphs"));
     }
 
     @VisibleForTesting
@@ -209,7 +216,24 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                 restClient,
                 clusterId,
                 waitStrategy,
-                DefaultClientHighAvailabilityServicesFactory.INSTANCE);
+                Files.createTempDirectory("flink-rest-client-jobgraphs"));
+    }
+
+    @VisibleForTesting
+    RestClusterClient(
+            Configuration configuration,
+            @Nullable RestClient restClient,
+            T clusterId,
+            WaitStrategy waitStrategy,
+            java.nio.file.Path tmpDir)
+            throws Exception {
+        this(
+                configuration,
+                restClient,
+                clusterId,
+                waitStrategy,
+                DefaultClientHighAvailabilityServicesFactory.INSTANCE,
+                tmpDir);
     }
 
     private RestClusterClient(
@@ -217,12 +241,14 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
             @Nullable RestClient restClient,
             T clusterId,
             WaitStrategy waitStrategy,
-            ClientHighAvailabilityServicesFactory clientHAServicesFactory)
+            ClientHighAvailabilityServicesFactory clientHAServicesFactory,
+            java.nio.file.Path tempDir)
             throws Exception {
         this.configuration = checkNotNull(configuration);
 
         this.restClusterClientConfiguration =
                 
RestClusterClientConfiguration.fromConfiguration(configuration);
+        this.tempDir = tempDir;
 
         if (restClient != null) {
             this.restClient = restClient;
@@ -328,7 +354,7 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                         () -> {
                             try {
                                 final java.nio.file.Path jobGraphFile =
-                                        Files.createTempFile("flink-jobgraph", 
".bin");
+                                        Files.createTempFile(tempDir, 
"flink-jobgraph", ".bin");
                                 try (ObjectOutputStream objectOut =
                                         new ObjectOutputStream(
                                                 
Files.newOutputStream(jobGraphFile))) {
@@ -430,6 +456,7 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                         });
 
         submissionFuture
+                .exceptionally(ignored -> null) // ignore errors
                 .thenCompose(ignored -> jobGraphFileFuture)
                 .thenAccept(
                         jobGraphFile -> {
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 6c091ebf9a7..2ff6dea2a98 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.io.network.partition.DataSetMetaInfo;
@@ -127,6 +128,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -148,6 +150,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -214,12 +217,18 @@ class RestClusterClientTest {
 
     private RestClusterClient<StandaloneClusterId> createRestClusterClient(
             int port, Configuration clientConfig) throws Exception {
+        return createRestClusterClient(port, clientConfig, 
Files.createTempDirectory("flink"));
+    }
+
+    private RestClusterClient<StandaloneClusterId> createRestClusterClient(
+            int port, Configuration clientConfig, Path tmpDir) throws 
Exception {
         clientConfig.setInteger(RestOptions.PORT, port);
         return new RestClusterClient<>(
                 clientConfig,
                 createRestClient(),
                 StandaloneClusterId.getInstance(),
-                (attempt) -> 0);
+                (attempt) -> 0,
+                tmpDir);
     }
 
     @Nonnull
@@ -974,6 +983,24 @@ class RestClusterClientTest {
         }
     }
 
+    @Test
+    void testJobGraphFileCleanedUpOnJobSubmissionFailure(@TempDir Path tmp) 
throws Exception {
+        try (final TestRestServerEndpoint restServerEndpoint =
+                createRestServerEndpoint(new SubmissionFailingHandler())) {
+            try (RestClusterClient<?> restClusterClient =
+                    createRestClusterClient(
+                            restServerEndpoint.getServerAddress().getPort(),
+                            new Configuration(restConfig),
+                            tmp)) {
+                assertThatThrownBy(() -> 
restClusterClient.submitJob(jobGraph).join())
+                        .hasCauseInstanceOf(JobSubmissionException.class);
+                try (Stream<Path> files = Files.list(tmp)) {
+                    assertThat(files).isEmpty();
+                }
+            }
+        }
+    }
+
     private final class SubmissionFailingHandler
             extends TestHandler<
                     JobSubmitRequestBody, JobSubmitResponseBody, 
EmptyMessageParameters> {

Reply via email to