Repository: flink Updated Branches: refs/heads/release-1.1 06496439a -> 62c666f57 (forced update)
[FLINK-4485] close and remove user class loader after job completion Keeping the user class loader around after job completion may lead to excessive temp space usage because all user jars are kept until the class loader is garbage collected. Tests showed that garbage collection can be delayed for a long time after the class loader is not referenced anymore. Note that for the class loader to not be referenced anymore, its job has to be removed from the archive. The fastest way to minimize temp space usage is to close and remove the URLClassloader after job completion. This requires us to keep a serializable copy of all data which needs the user class loader after job completion, e.g. to display data on the web interface. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62c666f5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62c666f5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62c666f5 Branch: refs/heads/release-1.1 Commit: 62c666f5794fa211bf570874b1b77044fd6840ac Parents: 8fd08bf Author: Maximilian Michels <[email protected]> Authored: Wed Sep 14 11:00:58 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Sat Sep 24 14:21:07 2016 +0200 ---------------------------------------------------------------------- .../flink/client/program/JobWithJars.java | 4 +- .../webmonitor/handlers/JobConfigHandler.java | 47 +++++------- .../librarycache/BlobLibraryCacheManager.java | 42 ++++++----- .../librarycache/FlinkUserCodeClassLoader.java | 35 +++++++++ .../runtime/executiongraph/ExecutionGraph.java | 34 +++++++++ .../archive/ExecutionConfigSummary.java | 75 ++++++++++++++++++++ .../apache/flink/runtime/taskmanager/Task.java | 3 +- .../flink/test/web/WebFrontendITCase.java | 2 +- 8 files changed, 191 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java index ef02527..d5a3014 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java @@ -22,12 +22,12 @@ import java.io.File; import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; -import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.flink.api.common.Plan; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; /** * A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files that contain @@ -134,6 +134,6 @@ public class JobWithJars { for (int i = 0; i < classpaths.size(); i++) { urls[i + jars.size()] = classpaths.get(i); } - return new URLClassLoader(urls, parent); + return new FlinkUserCodeClassLoader(urls, parent); } } http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java index cd63630..75389b1 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java @@ -22,8 +22,8 @@ import java.io.StringWriter; import java.util.Map; import com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; /** @@ -45,37 +45,28 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { gen.writeStringField("jid", graph.getJobID().toString()); gen.writeStringField("name", graph.getJobName()); - ExecutionConfig ec; - try { - ec = graph.getSerializedExecutionConfig().deserializeValue(graph.getUserClassLoader()); - } catch (Exception e) { - throw new RuntimeException("Couldn't deserialize ExecutionConfig.", e); - } + final ExecutionConfigSummary summary = graph.getExecutionConfigSummary(); - if (ec != null) { + if (summary != null) { gen.writeObjectFieldStart("execution-config"); - - gen.writeStringField("execution-mode", ec.getExecutionMode().name()); - - final String restartStrategyDescription = ec.getRestartStrategy() != null ? ec.getRestartStrategy().getDescription() : "default"; - gen.writeStringField("restart-strategy", restartStrategyDescription); - gen.writeNumberField("job-parallelism", ec.getParallelism()); - gen.writeBooleanField("object-reuse-mode", ec.isObjectReuseEnabled()); - - ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters(); - if (uc != null) { - Map<String, String> ucVals = uc.toMap(); - if (ucVals != null) { - gen.writeObjectFieldStart("user-config"); - - for (Map.Entry<String, String> ucVal : ucVals.entrySet()) { - gen.writeStringField(ucVal.getKey(), ucVal.getValue()); - } - - gen.writeEndObject(); + + gen.writeStringField("execution-mode", summary.getExecutionMode()); + + gen.writeStringField("restart-strategy", summary.getRestartStrategyDescription()); + gen.writeNumberField("job-parallelism", summary.getParallelism()); + gen.writeBooleanField("object-reuse-mode", summary.getObjectReuseEnabled()); + + Map<String, String> ucVals = summary.getGlobalJobParameters(); + if (ucVals != null) { + gen.writeObjectFieldStart("user-config"); + + for (Map.Entry<String, String> ucVal : ucVals.entrySet()) { + gen.writeStringField(ucVal.getKey(), ucVal.getValue()); } + + gen.writeEndObject(); } - + gen.writeEndObject(); } gen.writeEndObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index d1fbc70..c94768d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.execution.librarycache; import java.io.File; import java.io.IOException; import java.net.URL; -import java.net.URLClassLoader; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -138,8 +138,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC count++; } - URLClassLoader classLoader = new FlinkUserCodeClassLoader(urls); - cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, classLoader, task)); + cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, urls, task)); } else { entry.register(task, requiredJarFiles); @@ -156,14 +155,16 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC public void unregisterTask(JobID jobId, ExecutionAttemptID task) { Preconditions.checkNotNull(jobId, "The JobId must not be null."); Preconditions.checkNotNull(task, "The task execution id must not be null."); - + synchronized (lockObject) { LibraryCacheEntry entry = cacheEntries.get(jobId); - + if (entry != null) { if (entry.unregister(task)) { cacheEntries.remove(jobId); - + + entry.releaseClassLoader(); + for (BlobKey key : entry.getLibraries()) { unregisterReferenceToBlobKey(key); } @@ -286,17 +287,17 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC */ private static class LibraryCacheEntry { - private final ClassLoader classLoader; + private final FlinkUserCodeClassLoader classLoader; private final Set<ExecutionAttemptID> referenceHolders; private final Set<BlobKey> libraries; - public LibraryCacheEntry(Collection<BlobKey> libraries, ClassLoader classLoader, ExecutionAttemptID initialReference) { - this.classLoader = classLoader; - this.libraries = new HashSet<BlobKey>(libraries); - this.referenceHolders = new HashSet<ExecutionAttemptID>(); + public LibraryCacheEntry(Collection<BlobKey> libraries, URL[] libraryURLs, ExecutionAttemptID initialReference) { + this.classLoader = new FlinkUserCodeClassLoader(libraryURLs); + this.libraries = new HashSet<>(libraries); + this.referenceHolders = new HashSet<>(); this.referenceHolders.add(initialReference); } @@ -326,15 +327,18 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC public int getNumberOfReferenceHolders() { return referenceHolders.size(); } - } - - /** - * Give the URLClassLoader a nicer name for debugging purposes. - */ - private static class FlinkUserCodeClassLoader extends URLClassLoader { - public FlinkUserCodeClassLoader(URL[] urls) { - super(urls, FlinkUserCodeClassLoader.class.getClassLoader()); + /** + * Release the class loader to ensure any file descriptors are closed + * and the cached libraries are deleted immediately. + */ + void releaseClassLoader() { + try { + classLoader.close(); + } catch (IOException e) { + LOG.warn("Failed to release user code class loader for " + Arrays.toString(libraries.toArray())); + } } } + } http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java new file mode 100644 index 0000000..015f6c7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.execution.librarycache; + +import java.net.URL; +import java.net.URLClassLoader; + +/** + * Gives the URLClassLoader a nicer name for debugging purposes. + */ +public class FlinkUserCodeClassLoader extends URLClassLoader { + + public FlinkUserCodeClassLoader(URL[] urls) { + this(urls, FlinkUserCodeClassLoader.class.getClassLoader()); + } + + public FlinkUserCodeClassLoader(URL[] urls, ClassLoader parent) { + super(urls, parent); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 4229105..8cf8354 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -41,6 +41,8 @@ import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; +import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -237,6 +239,9 @@ public class ExecutionGraph implements Serializable { // ------ Fields that are only relevant for archived execution graphs ------------ private String jsonPlan; + /** Serializable summary of all job config values, e.g. for web interface */ + private ExecutionConfigSummary executionConfigSummary; + // -------------------------------------------------------------------------------------------- // Constructors // -------------------------------------------------------------------------------------------- @@ -314,6 +319,16 @@ public class ExecutionGraph implements Serializable { this.restartStrategy = restartStrategy; metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge()); + + // create a summary of all relevant data accessed in the web interface's JobConfigHandler + try { + ExecutionConfig executionConfig = serializedConfig.deserializeValue(userClassLoader); + if (executionConfig != null) { + this.executionConfigSummary = new ExecutionConfigSummary(executionConfig); + } + } catch (IOException | ClassNotFoundException e) { + LOG.error("Couldn't create ExecutionConfigSummary for job {} ", jobID, e); + } } // -------------------------------------------------------------------------------------------- @@ -1014,10 +1029,29 @@ public class ExecutionGraph implements Serializable { jobStatusListenerActors.clear(); executionListenerActors.clear(); + if (userClassLoader instanceof FlinkUserCodeClassLoader) { + try { + // close the classloader to free space of user jars immediately + // otherwise we have to wait until garbage collection + ((FlinkUserCodeClassLoader) userClassLoader).close(); + } catch (IOException e) { + LOG.warn("Failed to close the user classloader for job {}", jobID, e); + } + } + userClassLoader = null; + isArchived = true; } /** + * Returns the serializable ExecutionConfigSummary + * @return ExecutionConfigSummary which may be null in case of errors + */ + public ExecutionConfigSummary getExecutionConfigSummary() { + return executionConfigSummary; + } + + /** * Returns the serialized {@link ExecutionConfig}. * * @return ExecutionConfig http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java new file mode 100644 index 0000000..ad4677f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph.archive; + +import org.apache.flink.api.common.ExecutionConfig; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Map; + +/** + * Serializable class which is created when archiving the job. + * It can be used to display job information on the web interface + * without having to keep the classloader around after job completion. + */ +public class ExecutionConfigSummary implements Serializable { + + private final String executionMode; + private final String restartStrategyDescription; + private final int parallelism; + private final boolean objectReuseEnabled; + private final Map<String, String> globalJobParameters; + + public ExecutionConfigSummary(ExecutionConfig ec) { + executionMode = ec.getExecutionMode().name(); + if (ec.getRestartStrategy() != null) { + restartStrategyDescription = ec.getRestartStrategy().getDescription(); + } else { + restartStrategyDescription = "default"; + } + parallelism = ec.getParallelism(); + objectReuseEnabled = ec.isObjectReuseEnabled(); + if (ec.getGlobalJobParameters() != null + && ec.getGlobalJobParameters().toMap() != null) { + globalJobParameters = ec.getGlobalJobParameters().toMap(); + } else { + globalJobParameters = Collections.emptyMap(); + } + } + + public String getExecutionMode() { + return executionMode; + } + + public String getRestartStrategyDescription() { + return restartStrategyDescription; + } + + public int getParallelism() { + return parallelism; + } + + public boolean getObjectReuseEnabled() { + return objectReuseEnabled; + } + + public Map<String, String> getGlobalJobParameters() { + return globalJobParameters; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index dbc0b62..25a7e29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -459,6 +459,7 @@ public class Task implements Runnable { Map<String, Future<Path>> distributedCacheEntries = new HashMap<String, Future<Path>>(); AbstractInvokable invokable = null; + ClassLoader userCodeClassLoader = null; try { // ---------------------------- // Task Bootstrap - We periodically @@ -469,7 +470,7 @@ public class Task implements Runnable { // this may involve downloading the job's JAR files and/or classes LOG.info("Loading JAR files for task " + taskNameWithSubtask); - final ClassLoader userCodeClassLoader = createUserCodeClassloader(libraryCache); + userCodeClassLoader = createUserCodeClassloader(libraryCache); final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader); if (executionConfig.getTaskCancellationInterval() >= 0) { http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java index 032c8fe..39042d2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java @@ -249,7 +249,7 @@ public class WebFrontendITCase { assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json")); assertEquals("{\"jid\":\""+jid+"\",\"name\":\"Stoppable streaming test job\"," + "\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," + - "\"job-parallelism\":-1,\"object-reuse-mode\":false}}", response.getContent()); + "\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent()); } }
