Repository: flink
Updated Branches:
  refs/heads/release-1.1 06496439a -> 62c666f57 (forced update)


[FLINK-4485] close and remove user class loader after job completion

Keeping the user class loader around after job completion may lead to
excessive temp space usage because all user jars are kept until the
class loader is garbage collected. Tests showed that garbage collection
can be delayed for a long time after the class loader is not referenced
anymore. Note that for the class loader to not be referenced anymore,
its job has to be removed from the archive.

The fastest way to minimize temp space usage is to close and remove the
URLClassloader after job completion. This requires us to keep a
serializable copy of all data which needs the user class loader after
job completion, e.g. to display data on the web interface.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62c666f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62c666f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62c666f5

Branch: refs/heads/release-1.1
Commit: 62c666f5794fa211bf570874b1b77044fd6840ac
Parents: 8fd08bf
Author: Maximilian Michels <[email protected]>
Authored: Wed Sep 14 11:00:58 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Sat Sep 24 14:21:07 2016 +0200

----------------------------------------------------------------------
 .../flink/client/program/JobWithJars.java       |  4 +-
 .../webmonitor/handlers/JobConfigHandler.java   | 47 +++++-------
 .../librarycache/BlobLibraryCacheManager.java   | 42 ++++++-----
 .../librarycache/FlinkUserCodeClassLoader.java  | 35 +++++++++
 .../runtime/executiongraph/ExecutionGraph.java  | 34 +++++++++
 .../archive/ExecutionConfigSummary.java         | 75 ++++++++++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  3 +-
 .../flink/test/web/WebFrontendITCase.java       |  2 +-
 8 files changed, 191 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index ef02527..d5a3014 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -22,12 +22,12 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.Plan;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 
 /**
  * A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files 
that contain
@@ -134,6 +134,6 @@ public class JobWithJars {
                for (int i = 0; i < classpaths.size(); i++) {
                        urls[i + jars.size()] = classpaths.get(i);
                }
-               return new URLClassLoader(urls, parent);
+               return new FlinkUserCodeClassLoader(urls, parent);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index cd63630..75389b1 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -22,8 +22,8 @@ import java.io.StringWriter;
 import java.util.Map;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 /**
@@ -45,37 +45,28 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
                gen.writeStringField("jid", graph.getJobID().toString());
                gen.writeStringField("name", graph.getJobName());
 
-               ExecutionConfig ec;
-               try {
-                       ec = 
graph.getSerializedExecutionConfig().deserializeValue(graph.getUserClassLoader());
-               } catch (Exception e) {
-                       throw new RuntimeException("Couldn't deserialize 
ExecutionConfig.", e);
-               }
+               final ExecutionConfigSummary summary = 
graph.getExecutionConfigSummary();
 
-               if (ec != null) {
+               if (summary != null) {
                        gen.writeObjectFieldStart("execution-config");
-                       
-                       gen.writeStringField("execution-mode", 
ec.getExecutionMode().name());
-
-                       final String restartStrategyDescription = 
ec.getRestartStrategy() != null ? ec.getRestartStrategy().getDescription() : 
"default";
-                       gen.writeStringField("restart-strategy", 
restartStrategyDescription);
-                       gen.writeNumberField("job-parallelism", 
ec.getParallelism());
-                       gen.writeBooleanField("object-reuse-mode", 
ec.isObjectReuseEnabled());
-
-                       ExecutionConfig.GlobalJobParameters uc = 
ec.getGlobalJobParameters();
-                       if (uc != null) {
-                               Map<String, String> ucVals = uc.toMap();
-                               if (ucVals != null) {
-                                       
gen.writeObjectFieldStart("user-config");
-                                       
-                                       for (Map.Entry<String, String> ucVal : 
ucVals.entrySet()) {
-                                               
gen.writeStringField(ucVal.getKey(), ucVal.getValue());
-                                       }
-
-                                       gen.writeEndObject();
+
+                       gen.writeStringField("execution-mode", 
summary.getExecutionMode());
+
+                       gen.writeStringField("restart-strategy", 
summary.getRestartStrategyDescription());
+                       gen.writeNumberField("job-parallelism", 
summary.getParallelism());
+                       gen.writeBooleanField("object-reuse-mode", 
summary.getObjectReuseEnabled());
+
+                       Map<String, String> ucVals = 
summary.getGlobalJobParameters();
+                       if (ucVals != null) {
+                               gen.writeObjectFieldStart("user-config");
+
+                               for (Map.Entry<String, String> ucVal : 
ucVals.entrySet()) {
+                                       gen.writeStringField(ucVal.getKey(), 
ucVal.getValue());
                                }
+
+                               gen.writeEndObject();
                        }
-                       
+
                        gen.writeEndObject();
                }
                gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index d1fbc70..c94768d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.execution.librarycache;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.net.URLClassLoader;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -138,8 +138,7 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
                                        count++;
                                }
 
-                               URLClassLoader classLoader = new 
FlinkUserCodeClassLoader(urls);
-                               cacheEntries.put(jobId, new 
LibraryCacheEntry(requiredJarFiles, classLoader, task));
+                               cacheEntries.put(jobId, new 
LibraryCacheEntry(requiredJarFiles, urls, task));
                        }
                        else {
                                entry.register(task, requiredJarFiles);
@@ -156,14 +155,16 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
        public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
                Preconditions.checkNotNull(jobId, "The JobId must not be 
null.");
                Preconditions.checkNotNull(task, "The task execution id must 
not be null.");
-               
+
                synchronized (lockObject) {
                        LibraryCacheEntry entry = cacheEntries.get(jobId);
-                       
+
                        if (entry != null) {
                                if (entry.unregister(task)) {
                                        cacheEntries.remove(jobId);
-                                       
+
+                                       entry.releaseClassLoader();
+
                                        for (BlobKey key : 
entry.getLibraries()) {
                                                
unregisterReferenceToBlobKey(key);
                                        }
@@ -286,17 +287,17 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
         */
        private static class LibraryCacheEntry {
                
-               private final ClassLoader classLoader;
+               private final FlinkUserCodeClassLoader classLoader;
                
                private final Set<ExecutionAttemptID> referenceHolders;
                
                private final Set<BlobKey> libraries;
                
                
-               public LibraryCacheEntry(Collection<BlobKey> libraries, 
ClassLoader classLoader, ExecutionAttemptID initialReference) {
-                       this.classLoader = classLoader;
-                       this.libraries = new HashSet<BlobKey>(libraries);
-                       this.referenceHolders = new 
HashSet<ExecutionAttemptID>();
+               public LibraryCacheEntry(Collection<BlobKey> libraries, URL[] 
libraryURLs, ExecutionAttemptID initialReference) {
+                       this.classLoader = new 
FlinkUserCodeClassLoader(libraryURLs);
+                       this.libraries = new HashSet<>(libraries);
+                       this.referenceHolders = new HashSet<>();
                        this.referenceHolders.add(initialReference);
                }
                
@@ -326,15 +327,18 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
                public int getNumberOfReferenceHolders() {
                        return referenceHolders.size();
                }
-       }
-
-       /**
-        * Give the URLClassLoader a nicer name for debugging purposes.
-        */
-       private static class FlinkUserCodeClassLoader extends URLClassLoader {
 
-               public FlinkUserCodeClassLoader(URL[] urls) {
-                       super(urls, 
FlinkUserCodeClassLoader.class.getClassLoader());
+               /**
+                * Release the class loader to ensure any file descriptors are 
closed
+                * and the cached libraries are deleted immediately.
+                */
+               void releaseClassLoader() {
+                       try {
+                               classLoader.close();
+                       } catch (IOException e) {
+                               LOG.warn("Failed to release user code class 
loader for " + Arrays.toString(libraries.toArray()));
+                       }
                }
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
new file mode 100644
index 0000000..015f6c7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.execution.librarycache;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * Gives the URLClassLoader a nicer name for debugging purposes.
+ */
+public class FlinkUserCodeClassLoader extends URLClassLoader {
+
+       public FlinkUserCodeClassLoader(URL[] urls) {
+               this(urls, FlinkUserCodeClassLoader.class.getClassLoader());
+       }
+
+       public FlinkUserCodeClassLoader(URL[] urls, ClassLoader parent) {
+               super(urls, parent);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 4229105..8cf8354 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -41,6 +41,8 @@ import 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -237,6 +239,9 @@ public class ExecutionGraph implements Serializable {
        // ------ Fields that are only relevant for archived execution graphs 
------------
        private String jsonPlan;
 
+       /** Serializable summary of all job config values, e.g. for web 
interface */
+       private ExecutionConfigSummary executionConfigSummary;
+
        // 
--------------------------------------------------------------------------------------------
        //   Constructors
        // 
--------------------------------------------------------------------------------------------
@@ -314,6 +319,16 @@ public class ExecutionGraph implements Serializable {
                this.restartStrategy = restartStrategy;
 
                metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new 
RestartTimeGauge());
+
+               // create a summary of all relevant data accessed in the web 
interface's JobConfigHandler
+               try {
+                       ExecutionConfig executionConfig = 
serializedConfig.deserializeValue(userClassLoader);
+                       if (executionConfig != null) {
+                               this.executionConfigSummary = new 
ExecutionConfigSummary(executionConfig);
+                       }
+               } catch (IOException | ClassNotFoundException e) {
+                       LOG.error("Couldn't create ExecutionConfigSummary for 
job {} ", jobID, e);
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -1014,10 +1029,29 @@ public class ExecutionGraph implements Serializable {
                jobStatusListenerActors.clear();
                executionListenerActors.clear();
 
+               if (userClassLoader instanceof FlinkUserCodeClassLoader) {
+                       try {
+                               // close the classloader to free space of user 
jars immediately
+                               // otherwise we have to wait until garbage 
collection
+                               ((FlinkUserCodeClassLoader) 
userClassLoader).close();
+                       } catch (IOException e) {
+                               LOG.warn("Failed to close the user classloader 
for job {}", jobID, e);
+                       }
+               }
+               userClassLoader = null;
+
                isArchived = true;
        }
 
        /**
+        * Returns the serializable ExecutionConfigSummary
+        * @return ExecutionConfigSummary which may be null in case of errors
+        */
+       public ExecutionConfigSummary getExecutionConfigSummary() {
+               return executionConfigSummary;
+       }
+
+       /**
         * Returns the serialized {@link ExecutionConfig}.
         *
         * @return ExecutionConfig

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
new file mode 100644
index 0000000..ad4677f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph.archive;
+
+import org.apache.flink.api.common.ExecutionConfig;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Serializable class which is created when archiving the job.
+ * It can be used to display job information on the web interface
+ * without having to keep the classloader around after job completion.
+ */
+public class ExecutionConfigSummary implements Serializable {
+
+       private final String executionMode;
+       private final String restartStrategyDescription;
+       private final int parallelism;
+       private final boolean objectReuseEnabled;
+       private final Map<String, String> globalJobParameters;
+
+       public ExecutionConfigSummary(ExecutionConfig ec) {
+               executionMode = ec.getExecutionMode().name();
+               if (ec.getRestartStrategy() != null) {
+                       restartStrategyDescription = 
ec.getRestartStrategy().getDescription();
+               } else {
+                       restartStrategyDescription = "default";
+               }
+               parallelism = ec.getParallelism();
+               objectReuseEnabled = ec.isObjectReuseEnabled();
+               if (ec.getGlobalJobParameters() != null
+                               && ec.getGlobalJobParameters().toMap() != null) 
{
+                       globalJobParameters = 
ec.getGlobalJobParameters().toMap();
+               } else {
+                       globalJobParameters = Collections.emptyMap();
+               }
+       }
+
+       public String getExecutionMode() {
+               return executionMode;
+       }
+
+       public String getRestartStrategyDescription() {
+               return restartStrategyDescription;
+       }
+
+       public int getParallelism() {
+               return parallelism;
+       }
+
+       public boolean getObjectReuseEnabled() {
+               return objectReuseEnabled;
+       }
+
+       public Map<String, String> getGlobalJobParameters() {
+               return globalJobParameters;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dbc0b62..25a7e29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -459,6 +459,7 @@ public class Task implements Runnable {
                Map<String, Future<Path>> distributedCacheEntries = new 
HashMap<String, Future<Path>>();
                AbstractInvokable invokable = null;
 
+               ClassLoader userCodeClassLoader = null;
                try {
                        // ----------------------------
                        //  Task Bootstrap - We periodically
@@ -469,7 +470,7 @@ public class Task implements Runnable {
                        // this may involve downloading the job's JAR files 
and/or classes
                        LOG.info("Loading JAR files for task " + 
taskNameWithSubtask);
 
-                       final ClassLoader userCodeClassLoader = 
createUserCodeClassloader(libraryCache);
+                       userCodeClassLoader = 
createUserCodeClassloader(libraryCache);
                        final ExecutionConfig executionConfig = 
serializedExecutionConfig.deserializeValue(userCodeClassLoader);
 
                        if (executionConfig.getTaskCancellationInterval() >= 0) 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 032c8fe..39042d2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -249,7 +249,7 @@ public class WebFrontendITCase {
                        assertEquals(response.getType(), 
MimeTypes.getMimeTypeForExtension("json"));
                        assertEquals("{\"jid\":\""+jid+"\",\"name\":\"Stoppable 
streaming test job\"," +
                                        
"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\","
 +
-                                       
"\"job-parallelism\":-1,\"object-reuse-mode\":false}}", response.getContent());
+                                       
"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", 
response.getContent());
                }
        }
 

Reply via email to