This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 30f6964 [FLINK-21164][rest] Delete temporary jars
30f6964 is described below
commit 30f6964a9b72c79925c62f952a4559f2e48bdf06
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Jan 29 19:18:38 2021 +0100
[FLINK-21164][rest] Delete temporary jars
---
.../org/apache/flink/client/cli/CliFrontend.java | 8 ++------
.../flink/client/program/PackagedProgram.java | 18 ++++++++++++++++--
.../webmonitor/handlers/JarListHandler.java | 22 +++++++++-------------
.../webmonitor/handlers/JarPlanHandler.java | 9 +++++++--
.../runtime/webmonitor/handlers/JarRunHandler.java | 1 +
.../webmonitor/handlers/utils/JarHandlerUtils.java | 6 ++++--
6 files changed, 39 insertions(+), 25 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index a489219..c5ff839 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -242,12 +242,8 @@ public class CliFrontend {
LOG.debug("Effective executor configuration: {}",
effectiveConfiguration);
- final PackagedProgram program = getPackagedProgram(programOptions,
effectiveConfiguration);
-
- try {
+ try (PackagedProgram program = getPackagedProgram(programOptions,
effectiveConfiguration)) {
executeProgram(effectiveConfiguration, program);
- } finally {
- program.deleteExtractedLibraries();
}
}
@@ -387,7 +383,7 @@ public class CliFrontend {
}
} finally {
if (program != null) {
- program.deleteExtractedLibraries();
+ program.close();
}
}
}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 006d136..051a56d 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -26,6 +26,9 @@ import org.apache.flink.runtime.security.FlinkSecurityManager;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.JarUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.BufferedInputStream;
@@ -58,7 +61,9 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
* This class encapsulates represents a program, packaged in a jar file. It
supplies functionality
* to extract nested libraries, search for the program entry point, and
extract a program plan.
*/
-public class PackagedProgram {
+public class PackagedProgram implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PackagedProgram.class);
/**
* Property name of the entry in JAR manifest file that describes the
Flink specific entry
@@ -290,7 +295,7 @@ public class PackagedProgram {
}
/** Deletes all temporary files created for contained packaged libraries.
*/
- public void deleteExtractedLibraries() {
+ private void deleteExtractedLibraries() {
deleteExtractedLibraries(this.extractedTempLibraries);
this.extractedTempLibraries.clear();
}
@@ -617,6 +622,15 @@ public class PackagedProgram {
}
}
+ @Override
+ public void close() {
+ try {
+ deleteExtractedLibraries();
+ } catch (Exception e) {
+ LOG.debug("Error while deleting jars extracted from user-jar.", e);
+ }
+ }
+
/** A Builder For {@link PackagedProgram}. */
public static class Builder {
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index fafe616..27ef7f4 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -145,23 +145,19 @@ public class JarListHandler
for (String clazz : classes) {
clazz = clazz.trim();
- PackagedProgram program = null;
- try {
- program =
- PackagedProgram.newBuilder()
- .setJarFile(f)
-
.setEntryPointClassName(clazz)
-
.setConfiguration(configuration)
- .build();
- } catch (Exception ignored) {
- // ignore jar files which throw an error
upon creating a
- // PackagedProgram
- }
- if (program != null) {
+ try (PackagedProgram program =
+ PackagedProgram.newBuilder()
+ .setJarFile(f)
+ .setEntryPointClassName(clazz)
+
.setConfiguration(configuration)
+ .build()) {
JarListInfo.JarEntryInfo jarEntryInfo =
new JarListInfo.JarEntryInfo(
clazz,
program.getDescription());
jarEntryList.add(jarEntryInfo);
+ } catch (Exception ignored) {
+ // ignore jar files which throw an error
upon creating a
+ // PackagedProgram
}
}
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index d7cd6fb..4690792 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
@@ -100,8 +101,12 @@ public class JarPlanHandler
return CompletableFuture.supplyAsync(
() -> {
- final JobGraph jobGraph =
context.toJobGraph(configuration, true);
- return planGenerator.apply(jobGraph);
+ try (PackagedProgram packagedProgram =
+ context.toPackagedProgram(configuration)) {
+ final JobGraph jobGraph =
+ context.toJobGraph(packagedProgram,
configuration, true);
+ return planGenerator.apply(jobGraph);
+ }
},
executor);
}
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 06351c2..4573300 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -103,6 +103,7 @@ public class JarRunHandler
executor)
.handle(
(jobIds, throwable) -> {
+ program.close();
if (throwable != null) {
throw new CompletionException(
new RestHandlerException(
diff --git
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
index ffcac31..84e69a3 100644
---
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
+++
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
@@ -151,9 +151,11 @@ public class JarHandlerUtils {
URL::toString);
}
- public JobGraph toJobGraph(Configuration configuration, boolean
suppressOutput) {
+ public JobGraph toJobGraph(
+ PackagedProgram packagedProgram,
+ Configuration configuration,
+ boolean suppressOutput) {
try {
- final PackagedProgram packagedProgram =
toPackagedProgram(configuration);
return PackagedProgramUtils.createJobGraph(
packagedProgram, configuration, parallelism, jobId,
suppressOutput);
} catch (final ProgramInvocationException e) {