This is an automated email from the ASF dual-hosted git repository.
wanglijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 7c9e05ea8c6 [FLINK-32974][client] Avoid creating a new temporary
directory every time for RestClusterClient
7c9e05ea8c6 is described below
commit 7c9e05ea8c67b12c657b60cd5e6d1bea52b4f9a3
Author: Lijie Wang <[email protected]>
AuthorDate: Wed Sep 20 12:04:25 2023 +0800
[FLINK-32974][client] Avoid creating a new temporary directory every time
for RestClusterClient
This closes #23363
---
.../client/program/rest/RestClusterClient.java | 35 ++++------------------
.../client/program/rest/RestClusterClientTest.java | 31 ++++++++++---------
2 files changed, 22 insertions(+), 44 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 2ebcc14d5fe..a1ef5f7e0a9 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -158,7 +158,6 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
private static final Logger LOG =
LoggerFactory.getLogger(RestClusterClient.class);
private final RestClusterClientConfiguration
restClusterClientConfiguration;
- private final java.nio.file.Path tempDir;
private final Configuration configuration;
@@ -195,13 +194,7 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
public RestClusterClient(
Configuration config, T clusterId,
ClientHighAvailabilityServicesFactory factory)
throws Exception {
- this(
- config,
- null,
- clusterId,
- new ExponentialWaitStrategy(10L, 2000L),
- factory,
- Files.createTempDirectory("flink-rest-client-jobgraphs"));
+ this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L),
factory);
}
@VisibleForTesting
@@ -216,24 +209,7 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
restClient,
clusterId,
waitStrategy,
- Files.createTempDirectory("flink-rest-client-jobgraphs"));
- }
-
- @VisibleForTesting
- RestClusterClient(
- Configuration configuration,
- @Nullable RestClient restClient,
- T clusterId,
- WaitStrategy waitStrategy,
- java.nio.file.Path tmpDir)
- throws Exception {
- this(
- configuration,
- restClient,
- clusterId,
- waitStrategy,
- DefaultClientHighAvailabilityServicesFactory.INSTANCE,
- tmpDir);
+ DefaultClientHighAvailabilityServicesFactory.INSTANCE);
}
private RestClusterClient(
@@ -241,14 +217,12 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
@Nullable RestClient restClient,
T clusterId,
WaitStrategy waitStrategy,
- ClientHighAvailabilityServicesFactory clientHAServicesFactory,
- java.nio.file.Path tempDir)
+ ClientHighAvailabilityServicesFactory clientHAServicesFactory)
throws Exception {
this.configuration = checkNotNull(configuration);
this.restClusterClientConfiguration =
RestClusterClientConfiguration.fromConfiguration(configuration);
- this.tempDir = tempDir;
if (restClient != null) {
this.restClient = restClient;
@@ -354,7 +328,8 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
() -> {
try {
final java.nio.file.Path jobGraphFile =
- Files.createTempFile(tempDir,
"flink-jobgraph", ".bin");
+ Files.createTempFile(
+ "flink-jobgraph-" +
jobGraph.getJobID(), ".bin");
try (ObjectOutputStream objectOut =
new ObjectOutputStream(
Files.newOutputStream(jobGraphFile))) {
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 2ff6dea2a98..7b44c6f469f 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -217,18 +217,12 @@ class RestClusterClientTest {
private RestClusterClient<StandaloneClusterId> createRestClusterClient(
int port, Configuration clientConfig) throws Exception {
- return createRestClusterClient(port, clientConfig,
Files.createTempDirectory("flink"));
- }
-
- private RestClusterClient<StandaloneClusterId> createRestClusterClient(
- int port, Configuration clientConfig, Path tmpDir) throws
Exception {
clientConfig.setInteger(RestOptions.PORT, port);
return new RestClusterClient<>(
clientConfig,
createRestClient(),
StandaloneClusterId.getInstance(),
- (attempt) -> 0,
- tmpDir);
+ (attempt) -> 0);
}
@Nonnull
@@ -984,23 +978,32 @@ class RestClusterClientTest {
}
@Test
- void testJobGraphFileCleanedUpOnJobSubmissionFailure(@TempDir Path tmp)
throws Exception {
+ void testJobGraphFileCleanedUpOnJobSubmissionFailure() throws Exception {
+ final Path jobGraphFileDir = getTempDir();
try (final TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(new SubmissionFailingHandler())) {
try (RestClusterClient<?> restClusterClient =
- createRestClusterClient(
- restServerEndpoint.getServerAddress().getPort(),
- new Configuration(restConfig),
- tmp)) {
+
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
assertThatThrownBy(() ->
restClusterClient.submitJob(jobGraph).join())
.hasCauseInstanceOf(JobSubmissionException.class);
- try (Stream<Path> files = Files.list(tmp)) {
- assertThat(files).isEmpty();
+ try (Stream<Path> files = Files.list(jobGraphFileDir)) {
+ assertThat(files)
+ .noneMatch(
+ path ->
+ path.toString()
+
.contains(jobGraph.getJobID().toString()));
}
}
}
}
+ private static Path getTempDir() throws IOException {
+ Path tempFile = Files.createTempFile("test", ".bin");
+ Path tempDir = tempFile.getParent();
+ Files.delete(tempFile);
+ return tempDir;
+ }
+
private final class SubmissionFailingHandler
extends TestHandler<
JobSubmitRequestBody, JobSubmitResponseBody,
EmptyMessageParameters> {