This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new e5fb97e305d [FLINK-32226][client] Cleanup jobgraph file on submission
failure
e5fb97e305d is described below
commit e5fb97e305d6f23bdd0fae6484cb2f6c5c2dcd1d
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed May 31 13:36:14 2023 +0200
[FLINK-32226][client] Cleanup jobgraph file on submission failure
---
.../client/program/rest/RestClusterClient.java | 35 +++++++++++++++++++---
.../client/program/rest/RestClusterClientTest.java | 29 +++++++++++++++++-
2 files changed, 59 insertions(+), 5 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a05350d5e58..2ebcc14d5fe 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -158,6 +158,7 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
private static final Logger LOG =
LoggerFactory.getLogger(RestClusterClient.class);
private final RestClusterClientConfiguration
restClusterClientConfiguration;
+ private final java.nio.file.Path tempDir;
private final Configuration configuration;
@@ -194,7 +195,13 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
public RestClusterClient(
Configuration config, T clusterId,
ClientHighAvailabilityServicesFactory factory)
throws Exception {
- this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L),
factory);
+ this(
+ config,
+ null,
+ clusterId,
+ new ExponentialWaitStrategy(10L, 2000L),
+ factory,
+ Files.createTempDirectory("flink-rest-client-jobgraphs"));
}
@VisibleForTesting
@@ -209,7 +216,24 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
restClient,
clusterId,
waitStrategy,
- DefaultClientHighAvailabilityServicesFactory.INSTANCE);
+ Files.createTempDirectory("flink-rest-client-jobgraphs"));
+ }
+
+ @VisibleForTesting
+ RestClusterClient(
+ Configuration configuration,
+ @Nullable RestClient restClient,
+ T clusterId,
+ WaitStrategy waitStrategy,
+ java.nio.file.Path tmpDir)
+ throws Exception {
+ this(
+ configuration,
+ restClient,
+ clusterId,
+ waitStrategy,
+ DefaultClientHighAvailabilityServicesFactory.INSTANCE,
+ tmpDir);
}
private RestClusterClient(
@@ -217,12 +241,14 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
@Nullable RestClient restClient,
T clusterId,
WaitStrategy waitStrategy,
- ClientHighAvailabilityServicesFactory clientHAServicesFactory)
+ ClientHighAvailabilityServicesFactory clientHAServicesFactory,
+ java.nio.file.Path tempDir)
throws Exception {
this.configuration = checkNotNull(configuration);
this.restClusterClientConfiguration =
RestClusterClientConfiguration.fromConfiguration(configuration);
+ this.tempDir = tempDir;
if (restClient != null) {
this.restClient = restClient;
@@ -328,7 +354,7 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
() -> {
try {
final java.nio.file.Path jobGraphFile =
- Files.createTempFile("flink-jobgraph",
".bin");
+ Files.createTempFile(tempDir,
"flink-jobgraph", ".bin");
try (ObjectOutputStream objectOut =
new ObjectOutputStream(
Files.newOutputStream(jobGraphFile))) {
@@ -430,6 +456,7 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
});
submissionFuture
+ .exceptionally(ignored -> null) // ignore errors
.thenCompose(ignored -> jobGraphFileFuture)
.thenAccept(
jobGraphFile -> {
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 6c091ebf9a7..2ff6dea2a98 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.io.network.partition.DataSetMetaInfo;
@@ -127,6 +128,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
@@ -148,6 +150,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -214,12 +217,18 @@ class RestClusterClientTest {
private RestClusterClient<StandaloneClusterId> createRestClusterClient(
int port, Configuration clientConfig) throws Exception {
+ return createRestClusterClient(port, clientConfig,
Files.createTempDirectory("flink"));
+ }
+
+ private RestClusterClient<StandaloneClusterId> createRestClusterClient(
+ int port, Configuration clientConfig, Path tmpDir) throws
Exception {
clientConfig.setInteger(RestOptions.PORT, port);
return new RestClusterClient<>(
clientConfig,
createRestClient(),
StandaloneClusterId.getInstance(),
- (attempt) -> 0);
+ (attempt) -> 0,
+ tmpDir);
}
@Nonnull
@@ -974,6 +983,24 @@ class RestClusterClientTest {
}
}
+ @Test
+ void testJobGraphFileCleanedUpOnJobSubmissionFailure(@TempDir Path tmp)
throws Exception {
+ try (final TestRestServerEndpoint restServerEndpoint =
+ createRestServerEndpoint(new SubmissionFailingHandler())) {
+ try (RestClusterClient<?> restClusterClient =
+ createRestClusterClient(
+ restServerEndpoint.getServerAddress().getPort(),
+ new Configuration(restConfig),
+ tmp)) {
+ assertThatThrownBy(() ->
restClusterClient.submitJob(jobGraph).join())
+ .hasCauseInstanceOf(JobSubmissionException.class);
+ try (Stream<Path> files = Files.list(tmp)) {
+ assertThat(files).isEmpty();
+ }
+ }
+ }
+ }
+
private final class SubmissionFailingHandler
extends TestHandler<
JobSubmitRequestBody, JobSubmitResponseBody,
EmptyMessageParameters> {