This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 7c9e05ea8c6 [FLINK-32974][client] Avoid creating a new temporary 
directory every time for RestClusterClient
7c9e05ea8c6 is described below

commit 7c9e05ea8c67b12c657b60cd5e6d1bea52b4f9a3
Author: Lijie Wang <[email protected]>
AuthorDate: Wed Sep 20 12:04:25 2023 +0800

    [FLINK-32974][client] Avoid creating a new temporary directory every time 
for RestClusterClient
    
    This closes #23363
---
 .../client/program/rest/RestClusterClient.java     | 35 ++++------------------
 .../client/program/rest/RestClusterClientTest.java | 31 ++++++++++---------
 2 files changed, 22 insertions(+), 44 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 2ebcc14d5fe..a1ef5f7e0a9 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -158,7 +158,6 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(RestClusterClient.class);
 
     private final RestClusterClientConfiguration 
restClusterClientConfiguration;
-    private final java.nio.file.Path tempDir;
 
     private final Configuration configuration;
 
@@ -195,13 +194,7 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
     public RestClusterClient(
             Configuration config, T clusterId, 
ClientHighAvailabilityServicesFactory factory)
             throws Exception {
-        this(
-                config,
-                null,
-                clusterId,
-                new ExponentialWaitStrategy(10L, 2000L),
-                factory,
-                Files.createTempDirectory("flink-rest-client-jobgraphs"));
+        this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), 
factory);
     }
 
     @VisibleForTesting
@@ -216,24 +209,7 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                 restClient,
                 clusterId,
                 waitStrategy,
-                Files.createTempDirectory("flink-rest-client-jobgraphs"));
-    }
-
-    @VisibleForTesting
-    RestClusterClient(
-            Configuration configuration,
-            @Nullable RestClient restClient,
-            T clusterId,
-            WaitStrategy waitStrategy,
-            java.nio.file.Path tmpDir)
-            throws Exception {
-        this(
-                configuration,
-                restClient,
-                clusterId,
-                waitStrategy,
-                DefaultClientHighAvailabilityServicesFactory.INSTANCE,
-                tmpDir);
+                DefaultClientHighAvailabilityServicesFactory.INSTANCE);
     }
 
     private RestClusterClient(
@@ -241,14 +217,12 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
             @Nullable RestClient restClient,
             T clusterId,
             WaitStrategy waitStrategy,
-            ClientHighAvailabilityServicesFactory clientHAServicesFactory,
-            java.nio.file.Path tempDir)
+            ClientHighAvailabilityServicesFactory clientHAServicesFactory)
             throws Exception {
         this.configuration = checkNotNull(configuration);
 
         this.restClusterClientConfiguration =
                 
RestClusterClientConfiguration.fromConfiguration(configuration);
-        this.tempDir = tempDir;
 
         if (restClient != null) {
             this.restClient = restClient;
@@ -354,7 +328,8 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                         () -> {
                             try {
                                 final java.nio.file.Path jobGraphFile =
-                                        Files.createTempFile(tempDir, 
"flink-jobgraph", ".bin");
+                                        Files.createTempFile(
+                                                "flink-jobgraph-" + 
jobGraph.getJobID(), ".bin");
                                 try (ObjectOutputStream objectOut =
                                         new ObjectOutputStream(
                                                 
Files.newOutputStream(jobGraphFile))) {
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 2ff6dea2a98..7b44c6f469f 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -217,18 +217,12 @@ class RestClusterClientTest {
 
     private RestClusterClient<StandaloneClusterId> createRestClusterClient(
             int port, Configuration clientConfig) throws Exception {
-        return createRestClusterClient(port, clientConfig, 
Files.createTempDirectory("flink"));
-    }
-
-    private RestClusterClient<StandaloneClusterId> createRestClusterClient(
-            int port, Configuration clientConfig, Path tmpDir) throws 
Exception {
         clientConfig.setInteger(RestOptions.PORT, port);
         return new RestClusterClient<>(
                 clientConfig,
                 createRestClient(),
                 StandaloneClusterId.getInstance(),
-                (attempt) -> 0,
-                tmpDir);
+                (attempt) -> 0);
     }
 
     @Nonnull
@@ -984,23 +978,32 @@ class RestClusterClientTest {
     }
 
     @Test
-    void testJobGraphFileCleanedUpOnJobSubmissionFailure(@TempDir Path tmp) 
throws Exception {
+    void testJobGraphFileCleanedUpOnJobSubmissionFailure() throws Exception {
+        final Path jobGraphFileDir = getTempDir();
         try (final TestRestServerEndpoint restServerEndpoint =
                 createRestServerEndpoint(new SubmissionFailingHandler())) {
             try (RestClusterClient<?> restClusterClient =
-                    createRestClusterClient(
-                            restServerEndpoint.getServerAddress().getPort(),
-                            new Configuration(restConfig),
-                            tmp)) {
+                    
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
                 assertThatThrownBy(() -> 
restClusterClient.submitJob(jobGraph).join())
                         .hasCauseInstanceOf(JobSubmissionException.class);
-                try (Stream<Path> files = Files.list(tmp)) {
-                    assertThat(files).isEmpty();
+                try (Stream<Path> files = Files.list(jobGraphFileDir)) {
+                    assertThat(files)
+                            .noneMatch(
+                                    path ->
+                                            path.toString()
+                                                    
.contains(jobGraph.getJobID().toString()));
                 }
             }
         }
     }
 
+    private static Path getTempDir() throws IOException {
+        Path tempFile = Files.createTempFile("test", ".bin");
+        Path tempDir = tempFile.getParent();
+        Files.delete(tempFile);
+        return tempDir;
+    }
+
     private final class SubmissionFailingHandler
             extends TestHandler<
                     JobSubmitRequestBody, JobSubmitResponseBody, 
EmptyMessageParameters> {

Reply via email to