This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 30f6964  [FLINK-21164][rest] Delete temporary jars
30f6964 is described below

commit 30f6964a9b72c79925c62f952a4559f2e48bdf06
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Jan 29 19:18:38 2021 +0100

    [FLINK-21164][rest] Delete temporary jars
---
 .../org/apache/flink/client/cli/CliFrontend.java   |  8 ++------
 .../flink/client/program/PackagedProgram.java      | 18 ++++++++++++++++--
 .../webmonitor/handlers/JarListHandler.java        | 22 +++++++++-------------
 .../webmonitor/handlers/JarPlanHandler.java        |  9 +++++++--
 .../runtime/webmonitor/handlers/JarRunHandler.java |  1 +
 .../webmonitor/handlers/utils/JarHandlerUtils.java |  6 ++++--
 6 files changed, 39 insertions(+), 25 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index a489219..c5ff839 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -242,12 +242,8 @@ public class CliFrontend {
 
         LOG.debug("Effective executor configuration: {}", 
effectiveConfiguration);
 
-        final PackagedProgram program = getPackagedProgram(programOptions, 
effectiveConfiguration);
-
-        try {
+        try (PackagedProgram program = getPackagedProgram(programOptions, 
effectiveConfiguration)) {
             executeProgram(effectiveConfiguration, program);
-        } finally {
-            program.deleteExtractedLibraries();
         }
     }
 
@@ -387,7 +383,7 @@ public class CliFrontend {
             }
         } finally {
             if (program != null) {
-                program.deleteExtractedLibraries();
+                program.close();
             }
         }
     }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 006d136..051a56d 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -26,6 +26,9 @@ import org.apache.flink.runtime.security.FlinkSecurityManager;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.JarUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.BufferedInputStream;
@@ -58,7 +61,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * This class encapsulates represents a program, packaged in a jar file. It 
supplies functionality
  * to extract nested libraries, search for the program entry point, and 
extract a program plan.
  */
-public class PackagedProgram {
+public class PackagedProgram implements AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PackagedProgram.class);
 
     /**
      * Property name of the entry in JAR manifest file that describes the 
Flink specific entry
@@ -290,7 +295,7 @@ public class PackagedProgram {
     }
 
     /** Deletes all temporary files created for contained packaged libraries. 
*/
-    public void deleteExtractedLibraries() {
+    private void deleteExtractedLibraries() {
         deleteExtractedLibraries(this.extractedTempLibraries);
         this.extractedTempLibraries.clear();
     }
@@ -617,6 +622,15 @@ public class PackagedProgram {
         }
     }
 
+    @Override
+    public void close() {
+        try {
+            deleteExtractedLibraries();
+        } catch (Exception e) {
+            LOG.debug("Error while deleting jars extracted from user-jar.", e);
+        }
+    }
+
     /** A Builder For {@link PackagedProgram}. */
     public static class Builder {
 
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index fafe616..27ef7f4 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -145,23 +145,19 @@ public class JarListHandler
                             for (String clazz : classes) {
                                 clazz = clazz.trim();
 
-                                PackagedProgram program = null;
-                                try {
-                                    program =
-                                            PackagedProgram.newBuilder()
-                                                    .setJarFile(f)
-                                                    
.setEntryPointClassName(clazz)
-                                                    
.setConfiguration(configuration)
-                                                    .build();
-                                } catch (Exception ignored) {
-                                    // ignore jar files which throw an error 
upon creating a
-                                    // PackagedProgram
-                                }
-                                if (program != null) {
+                                try (PackagedProgram program =
+                                        PackagedProgram.newBuilder()
+                                                .setJarFile(f)
+                                                .setEntryPointClassName(clazz)
+                                                
.setConfiguration(configuration)
+                                                .build()) {
                                     JarListInfo.JarEntryInfo jarEntryInfo =
                                             new JarListInfo.JarEntryInfo(
                                                     clazz, 
program.getDescription());
                                     jarEntryList.add(jarEntryInfo);
+                                } catch (Exception ignored) {
+                                    // ignore jar files which throw an error 
upon creating a
+                                    // PackagedProgram
                                 }
                             }
 
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index d7cd6fb..4690792 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
@@ -100,8 +101,12 @@ public class JarPlanHandler
 
         return CompletableFuture.supplyAsync(
                 () -> {
-                    final JobGraph jobGraph = 
context.toJobGraph(configuration, true);
-                    return planGenerator.apply(jobGraph);
+                    try (PackagedProgram packagedProgram =
+                            context.toPackagedProgram(configuration)) {
+                        final JobGraph jobGraph =
+                                context.toJobGraph(packagedProgram, 
configuration, true);
+                        return planGenerator.apply(jobGraph);
+                    }
                 },
                 executor);
     }
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 06351c2..4573300 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -103,6 +103,7 @@ public class JarRunHandler
                         executor)
                 .handle(
                         (jobIds, throwable) -> {
+                            program.close();
                             if (throwable != null) {
                                 throw new CompletionException(
                                         new RestHandlerException(
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
index ffcac31..84e69a3 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
@@ -151,9 +151,11 @@ public class JarHandlerUtils {
                     URL::toString);
         }
 
-        public JobGraph toJobGraph(Configuration configuration, boolean 
suppressOutput) {
+        public JobGraph toJobGraph(
+                PackagedProgram packagedProgram,
+                Configuration configuration,
+                boolean suppressOutput) {
             try {
-                final PackagedProgram packagedProgram = 
toPackagedProgram(configuration);
                 return PackagedProgramUtils.createJobGraph(
                         packagedProgram, configuration, parallelism, jobId, 
suppressOutput);
             } catch (final ProgramInvocationException e) {

Reply via email to