Repository: flink Updated Branches: refs/heads/master 2a4ac6600 -> a86b64686
[FLINK-7442] Add option for using a child-first classloader for loading user code This also adds an end-to-end test that verifies correct order for both classes and resources. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a86b6468 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a86b6468 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a86b6468 Branch: refs/heads/master Commit: a86b6468617f7078c375f6484975b59477aab193 Parents: 2a4ac66 Author: Aljoscha Krettek <[email protected]> Authored: Mon Aug 14 14:53:14 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Sep 21 15:01:48 2017 +0200 ---------------------------------------------------------------------- docs/ops/config.md | 5 +- .../flink/client/program/JobWithJars.java | 4 +- .../state/RocksDbMultiClassLoaderTest.java | 35 +--- .../apache/flink/configuration/CoreOptions.java | 8 + flink-end-to-end-tests/pom.xml | 107 ++++++++++++ .../flink/runtime/taskmanager/TaskManager.java | 30 ++++ .../streaming/tests/ClassLoaderTestProgram.java | 102 +++++++++++ .../src/main/resources/.version.properties | 1 + .../apache/flink/runtime/client/JobClient.java | 4 +- .../librarycache/BlobLibraryCacheManager.java | 25 ++- .../librarycache/FlinkUserCodeClassLoader.java | 35 ---- .../librarycache/FlinkUserCodeClassLoaders.java | 170 +++++++++++++++++++ .../runtime/jobmaster/JobManagerServices.java | 8 +- .../runtime/taskexecutor/TaskExecutor.java | 3 +- .../taskexecutor/TaskManagerConfiguration.java | 19 ++- .../flink/runtime/jobmanager/JobManager.scala | 6 +- .../runtime/minicluster/FlinkMiniCluster.scala | 8 +- .../flink/runtime/taskmanager/TaskManager.scala | 2 +- .../BlobLibraryCacheManagerTest.java | 8 +- .../BlobLibraryCacheRecoveryITCase.java | 2 +- .../jobmanager/JobManagerHARecoveryTest.java | 5 +- .../flink/runtime/jobmaster/JobMasterTest.java | 6 +- .../JobManagerLeaderElectionTest.java | 3 +- ...askManagerComponentsStartupShutdownTest.java | 4 +- pom.xml | 1 + .../test_streaming_classloader.sh | 99 +++++++++++ tools/travis_mvn_watchdog.sh | 6 + 27 files changed, 607 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/docs/ops/config.md ---------------------------------------------------------------------- diff --git a/docs/ops/config.md b/docs/ops/config.md index e0b9d4d..9d2405e 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -74,6 +74,9 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to - `fs.hdfs.hadoopconf`: The absolute path to the Hadoop File System's (HDFS) configuration **directory** (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (`hdfs:///path/to/files`, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like `hdfs://address:port/path/to/files`. This option also causes file writers to pick up the HDFS's default values for block sizes and replication factors. Flink will look for the "core-site.xml" and "hdfs-site.xml" files in the specified directory. +- `classloader.resolve-order`: Whether Flink should use a child-first `ClassLoader` when loading +user-code classes or a parent-first `ClassLoader`. Can be one of `parent-first` or `child-first`. (default: `child-first`) + ## Advanced Options ### Compute @@ -186,7 +189,7 @@ will be used under the directory specified by jobmanager.web.tmpdir. - `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups. -- `state.backend.rocksdb.checkpointdir`: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example â:â (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`) +- `state.backend.rocksdb.checkpointdir`: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example ':' (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`) - `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/ops/state/checkpoints.html#externalized-checkpoints). http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java index ae94ece..768de87 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java @@ -19,7 +19,7 @@ package org.apache.flink.client.program; import org.apache.flink.api.common.Plan; -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import java.io.File; import java.io.IOException; @@ -133,6 +133,6 @@ public class JobWithJars { for (int i = 0; i < classpaths.size(); i++) { urls[i + jars.size()] = classpaths.get(i); } - return new FlinkUserCodeClassLoader(urls, parent); + return FlinkUserCodeClassLoaders.parentFirst(urls, parent); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java index 4ec6532..1dbc05e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java @@ -18,6 +18,8 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -25,7 +27,6 @@ import org.rocksdb.RocksDB; import java.lang.reflect.Method; import java.net.URL; -import java.net.URLClassLoader; import static org.junit.Assert.assertNotEquals; @@ -46,8 +47,8 @@ public class RocksDbMultiClassLoaderTest { final URL codePath2 = RocksDB.class.getProtectionDomain().getCodeSource().getLocation(); final ClassLoader parent = getClass().getClassLoader(); - final ClassLoader loader1 = new ChildFirstClassLoader(new URL[] { codePath1, codePath2 }, parent); - final ClassLoader loader2 = new ChildFirstClassLoader(new URL[] { codePath1, codePath2 }, parent); + final ClassLoader loader1 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent); + final ClassLoader loader2 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent); final String className = RocksDBStateBackend.class.getName(); @@ -69,32 +70,4 @@ public class RocksDbMultiClassLoaderTest { meth1.invoke(instance1, tempDir); meth2.invoke(instance2, tempDir); } - - // ------------------------------------------------------------------------ - - /** - * A variant of the URLClassLoader that first loads from the URLs and only after that from the parent. - */ - private static final class ChildFirstClassLoader extends URLClassLoader { - - private final ClassLoader parent; - - public ChildFirstClassLoader(URL[] urls, ClassLoader parent) { - super(urls, null); - this.parent = parent; - } - - @Override - public Class<?> findClass(String name) throws ClassNotFoundException { - // first try to load from the URLs - // because the URLClassLoader's parent is null, this cannot implicitly load from the parent - try { - return super.findClass(name); - } - catch (ClassNotFoundException e) { - // not in the URL, check the parent - return parent.loadClass(name); - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index 80d610a..d1005c4 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -27,6 +27,14 @@ public class CoreOptions { // process parameters // ------------------------------------------------------------------------ + public static final ConfigOption<String> CLASSLOADER_RESOLVE_ORDER = ConfigOptions + .key("classloader.resolve-order") + .defaultValue("child-first"); + + // ------------------------------------------------------------------------ + // process parameters + // ------------------------------------------------------------------------ + public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions .key("env.java.opts") .defaultValue(""); http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-end-to-end-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml new file mode 100644 index 0000000..db05c36 --- /dev/null +++ b/flink-end-to-end-tests/pom.xml @@ -0,0 +1,107 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>1.4-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-end-to-end-tests_${scala.binary.version}</artifactId> + <name>flink-end-to-end-tests</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + + <executions> + <!-- ClassLoaderTestProgram --> + <execution> + <id>ClassLoaderTestProgram</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <classifier>ClassLoaderTestProgram</classifier> + + <archive> + <manifestEntries> + <program-class>org.apache.flink.streaming.tests.ClassLoaderTestProgram</program-class> + </manifestEntries> + </archive> + + <includes> + <include>org/apache/flink/streaming/tests/ClassLoaderTestProgram.class</include> + <include>org/apache/flink/runtime/taskmanager/TaskManager.class</include> + <include>.version.properties</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + + <!--simplify the name of the testing JARs for referring to them in the end-to-end test scripts--> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <id>rename</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <copy file="${project.basedir}/target/flink-end-to-end-tests_${scala.binary.version}-${project.version}-ClassLoaderTestProgram.jar" tofile="${project.basedir}/target/ClassLoaderTestProgram.jar" /> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-end-to-end-tests/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java new file mode 100644 index 0000000..3626885 --- /dev/null +++ b/flink-end-to-end-tests/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +/** + * A {@code Taskmanager} in the same package as the proper Flink {@link TaskManager}. We use this + * to check whether Flink correctly uses the child-first {@link ClassLoader} when configured to do + * so. + */ +public class TaskManager { + public static String getMessage() { + return "Hello, World!"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/ClassLoaderTestProgram.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/ClassLoaderTestProgram.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/ClassLoaderTestProgram.java new file mode 100644 index 0000000..1d4ca4c --- /dev/null +++ b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/ClassLoaderTestProgram.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.io.InputStream; +import java.net.URL; +import java.util.Enumeration; +import java.util.Properties; + +/** + * End-to-end test program for verifying that the {@code classloader.resolve-order} setting + * is being honored by Flink. We test this by creating a fake {@code TaskManager} with a single + * method that we call in the same package as the original Flink {@code TaskManager} and verify that + * we get a {@link NoSuchMethodError} if we're running with {@code parent-first} class loading + * and that we get the correct result from the method when we're running with {@code child-first} + * class loading. + */ +public class ClassLoaderTestProgram { + + public static void main(String[] args) throws Exception { + + final ParameterTool params = ParameterTool.fromArgs(args); + + final String resolveOrder = params.getRequired("resolve-order"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env + .fromElements("Hello") + .map((MapFunction<String, String>) value -> { + + String gitUrl; + + try (InputStream propFile = ClassLoaderTestProgram.class.getClassLoader().getResourceAsStream(".version.properties")) { + Properties properties = new Properties(); + properties.load(propFile); + gitUrl = properties.getProperty("git.remote.origin.url"); + } + + Enumeration<URL> resources = ClassLoaderTestProgram.class.getClassLoader().getResources( + ".version.properties"); + + StringBuilder sortedProperties = new StringBuilder(); + while (resources.hasMoreElements()) { + URL url = resources.nextElement(); + try (InputStream in = url.openStream()) { + Properties properties = new Properties(); + properties.load(in); + String orderedGitUrl = properties.getProperty("git.remote.origin.url"); + sortedProperties.append(orderedGitUrl); + } + } + + if (resolveOrder.equals("parent-first")) { + try { + @SuppressWarnings("unused") + String ignored = TaskManager.getMessage(); + + throw new RuntimeException( + "TaskManager.getMessage() should not be available with parent-first " + + "ClassLoader order."); + + } catch (NoSuchMethodError e) { + // expected + } + return "NoSuchMethodError:" + gitUrl + ":" + sortedProperties; + } else if (resolveOrder.equals("child-first")) { + String message = TaskManager.getMessage(); + if (!message.equals("Hello, World!")) { + throw new RuntimeException("Wrong message from fake TaskManager."); + } + return message + ":" + gitUrl + ":" + sortedProperties; + } else { + throw new RuntimeException("Unknown resolve order: " + resolveOrder); + } + }) + .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE); + + env.execute("ClassLoader Test Program"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-end-to-end-tests/src/main/resources/.version.properties ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/src/main/resources/.version.properties b/flink-end-to-end-tests/src/main/resources/.version.properties new file mode 100644 index 0000000..dc98aea --- /dev/null +++ b/flink-end-to-end-tests/src/main/resources/.version.properties @@ -0,0 +1 @@ +git.remote.origin.url=hello-there-42 http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index 425461c..a30b711 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobManagerGateway; @@ -250,7 +250,7 @@ public class JobClient { allURLs[pos++] = url; } - return new FlinkUserCodeClassLoader(allURLs, JobClient.class.getClassLoader()); + return FlinkUserCodeClassLoaders.parentFirst(allURLs, JobClient.class.getClassLoader()); } else { throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found"); } http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index c8fc4e4..038d10f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -31,6 +31,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.net.URL; +import java.net.URLClassLoader; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -63,10 +64,16 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { /** The blob service to download libraries */ private final BlobService blobService; + /** The resolve order to use when creating a {@link ClassLoader}. */ + private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder; + // -------------------------------------------------------------------------------------------- - public BlobLibraryCacheManager(BlobService blobService) { + public BlobLibraryCacheManager( + BlobService blobService, + FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) { this.blobService = checkNotNull(blobService); + this.classLoaderResolveOrder = checkNotNull(classLoaderResolveOrder); } @Override @@ -112,7 +119,7 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { } cacheEntries.put(jobId, new LibraryCacheEntry( - requiredJarFiles, requiredClasspaths, urls, task)); + requiredJarFiles, requiredClasspaths, urls, task, classLoaderResolveOrder)); } catch (Throwable t) { // rethrow or wrap ExceptionUtils.tryRethrowIOException(t); @@ -148,7 +155,7 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { // else has already been unregistered } } - + @Override public ClassLoader getClassLoader(JobID jobId) { checkNotNull(jobId, "The JobId must not be null."); @@ -204,7 +211,7 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { */ private static class LibraryCacheEntry { - private final FlinkUserCodeClassLoader classLoader; + private final URLClassLoader classLoader; private final Set<ExecutionAttemptID> referenceHolders; /** @@ -242,9 +249,15 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { Collection<BlobKey> requiredLibraries, Collection<URL> requiredClasspaths, URL[] libraryURLs, - ExecutionAttemptID initialReference) { + ExecutionAttemptID initialReference, + FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) { + + this.classLoader = + FlinkUserCodeClassLoaders.create( + classLoaderResolveOrder, + libraryURLs, + FlinkUserCodeClassLoaders.class.getClassLoader()); - this.classLoader = new FlinkUserCodeClassLoader(libraryURLs); // NOTE: do not store the class paths, i.e. URLs, into a set for performance reasons // see http://findbugs.sourceforge.net/bugDescriptions.html#DMI_COLLECTION_OF_URLS // -> alternatively, compare their string representation http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java deleted file mode 100644 index 015f6c7..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.runtime.execution.librarycache; - -import java.net.URL; -import java.net.URLClassLoader; - -/** - * Gives the URLClassLoader a nicer name for debugging purposes. - */ -public class FlinkUserCodeClassLoader extends URLClassLoader { - - public FlinkUserCodeClassLoader(URL[] urls) { - this(urls, FlinkUserCodeClassLoader.class.getClassLoader()); - } - - public FlinkUserCodeClassLoader(URL[] urls, ClassLoader parent) { - super(urls, parent); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java new file mode 100644 index 0000000..ef36c36 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.execution.librarycache; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; + +/** + * Gives the URLClassLoader a nicer name for debugging purposes. + */ +public class FlinkUserCodeClassLoaders { + + public static URLClassLoader parentFirst(URL[] urls) { + return new ParentFirstClassLoader(urls); + } + + public static URLClassLoader parentFirst(URL[] urls, ClassLoader parent) { + return new ParentFirstClassLoader(urls, parent); + } + + public static URLClassLoader childFirst(URL[] urls, ClassLoader parent) { + return new ChildFirstClassLoader(urls, parent); + } + + public static URLClassLoader create( + ResolveOrder resolveOrder, URL[] urls, ClassLoader parent) { + + switch (resolveOrder) { + case CHILD_FIRST: + return childFirst(urls, parent); + case PARENT_FIRST: + return parentFirst(urls, parent); + default: + throw new IllegalArgumentException("Unkown class resolution order: " + resolveOrder); + } + } + + /** + * Class resolution order for Flink URL {@link ClassLoader}. + */ + public enum ResolveOrder { + CHILD_FIRST, PARENT_FIRST; + + public static ResolveOrder fromString(String resolveOrder) { + if (resolveOrder.equalsIgnoreCase("parent-first")) { + return PARENT_FIRST; + } else if (resolveOrder.equalsIgnoreCase("child-first")) { + return CHILD_FIRST; + } else { + throw new IllegalArgumentException("Unknown resolve order: " + resolveOrder); + } + } + } + + /** + * Regular URLClassLoader that first loads from the parent and only after that form the URLs. + */ + static class ParentFirstClassLoader extends URLClassLoader { + + ParentFirstClassLoader(URL[] urls) { + this(urls, FlinkUserCodeClassLoaders.class.getClassLoader()); + } + + ParentFirstClassLoader(URL[] urls, ClassLoader parent) { + super(urls, parent); + } + } + + /** + * A variant of the URLClassLoader that first loads from the URLs and only after that from the parent. + * + * <p>{@link #getResourceAsStream(String)} uses {@link #getResource(String)} internally so we + * don't override that. + */ + static final class ChildFirstClassLoader extends URLClassLoader { + + public ChildFirstClassLoader(URL[] urls, ClassLoader parent) { + super(urls, parent); + } + + @Override + protected synchronized Class<?> loadClass( + String name, boolean resolve) throws ClassNotFoundException { + + // First, check if the class has already been loaded + Class<?> c = findLoadedClass(name); + + if (c == null) { + try { + // check the URLs + c = findClass(name); + } catch (ClassNotFoundException e) { + // let URLClassLoader do it, which will eventually call the parent + c = super.loadClass(name, resolve); + } + } + + if (resolve) { + resolveClass(c); + } + + return c; + } + + @Override + public URL getResource(String name) { + // first, try and find it via the URLClassloader + URL urlClassLoaderResource = findResource(name); + + if (urlClassLoaderResource != null) { + return urlClassLoaderResource; + } + + // delegate to super + return super.getResource(name); + } + + @Override + public Enumeration<URL> getResources(String name) throws IOException { + // first get resources from URLClassloader + Enumeration<URL> urlClassLoaderResources = findResources(name); + + final List<URL> result = new ArrayList<>(); + + while (urlClassLoaderResources.hasMoreElements()) { + result.add(urlClassLoaderResources.nextElement()); + } + + // get parent urls + Enumeration<URL> parentResources = getParent().getResources(name); + + while (parentResources.hasMoreElements()) { + result.add(parentResources.nextElement()); + } + + return new Enumeration<URL>() { + Iterator<URL> iter = result.iterator(); + + public boolean hasMoreElements() { + return iter.hasNext(); + } + + public URL nextElement() { + return iter.next(); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java index 57aeaff..f7daabb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java @@ -20,10 +20,12 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.Hardware; @@ -111,7 +113,11 @@ public class JobManagerServices { Preconditions.checkNotNull(config); Preconditions.checkNotNull(blobServer); - final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobServer); + final String classLoaderResolveOrder = + config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER); + + final BlobLibraryCacheManager libraryCacheManager = + new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder)); final FiniteDuration timeout; try { http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index b6a0637..859ffbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -941,7 +941,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { blobServerAddress, taskManagerConfiguration.getConfiguration(), haServices.createBlobStore()); - libraryCacheManager = new BlobLibraryCacheManager(blobCache); + libraryCacheManager = + new BlobLibraryCacheManager(blobCache, taskManagerConfiguration.getClassLoaderResolveOrder()); } catch (IOException e) { // Can't pass the IOException up - we need a RuntimeException anyway // two levels up where this is run asynchronously. Also, we don't http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index 7c7693b..60dd643 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -23,9 +23,11 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.util.Preconditions; @@ -58,6 +60,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { private final boolean exitJvmOnOutOfMemory; + private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder; + public TaskManagerConfiguration( int numberSlots, String[] tmpDirectories, @@ -68,7 +72,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { Time refusedRegistrationPause, long cleanupInterval, Configuration configuration, - boolean exitJvmOnOutOfMemory) { + boolean exitJvmOnOutOfMemory, + FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) { this.numberSlots = numberSlots; this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories); @@ -79,6 +84,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause); this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration)); this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory; + this.classLoaderResolveOrder = classLoaderResolveOrder; } public int getNumberSlots() { @@ -120,6 +126,10 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { return exitJvmOnOutOfMemory; } + public FlinkUserCodeClassLoaders.ResolveOrder getClassLoaderResolveOrder() { + return classLoaderResolveOrder; + } + // -------------------------------------------------------------------------------------------- // Static factory methods // -------------------------------------------------------------------------------------------- @@ -212,6 +222,10 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY); + final String classLoaderResolveOrder = + configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER); + + return new TaskManagerConfiguration( numberSlots, tmpDirPaths, @@ -222,6 +236,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { refusedRegistrationPause, cleanupInterval, configuration, - exitOnOom); + exitOnOom, + FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 276e0ff..67ffb32 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -47,6 +47,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.SuppressRestartsException +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph._ @@ -2467,6 +2468,8 @@ object JobManager { val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) + val classLoaderResolveOrder = configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER) + val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration) val archiveCount = configuration.getInteger(WebOptions.ARCHIVE_COUNT) @@ -2497,7 +2500,8 @@ object JobManager { blobServer = new BlobServer(configuration, blobStore) instanceManager = new InstanceManager() scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor)) - libraryCacheManager = new BlobLibraryCacheManager(blobServer) + libraryCacheManager = + new BlobLibraryCacheManager(blobServer, ResolveOrder.fromString(classLoaderResolveOrder)) instanceManager.addInstanceListener(scheduler) } http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 7f4308e..cedf607 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -18,7 +18,7 @@ package org.apache.flink.runtime.minicluster -import java.net.URL +import java.net.{URL, URLClassLoader} import java.util.UUID import java.util.concurrent.{Executors, TimeUnit} @@ -33,7 +33,7 @@ import org.apache.flink.core.fs.Path import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils} import org.apache.flink.runtime.client.{JobClient, JobExecutionException} import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors} -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} import org.apache.flink.runtime.jobgraph.JobGraph @@ -668,7 +668,7 @@ abstract class FlinkMiniCluster( private def createUserCodeClassLoader( jars: java.util.List[Path], classPaths: java.util.List[URL], - parentClassLoader: ClassLoader): FlinkUserCodeClassLoader = { + parentClassLoader: ClassLoader): URLClassLoader = { val urls = new Array[URL](jars.size() + classPaths.size()) @@ -686,6 +686,6 @@ abstract class FlinkMiniCluster( counter += 1 } - new FlinkUserCodeClassLoader(urls, parentClassLoader) + FlinkUserCodeClassLoaders.parentFirst(urls, parentClassLoader) } } http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 431adb6..94f375a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -970,7 +970,7 @@ class TaskManager( highAvailabilityServices.createBlobStore()) blobCache = Option(blobcache) libraryCacheManager = Some( - new BlobLibraryCacheManager(blobcache)) + new BlobLibraryCacheManager(blobcache, config.getClassLoaderResolveOrder())) } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index a4b48e8..3241b86 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -95,7 +95,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger { bc.close(); - libCache = new BlobLibraryCacheManager(cache); + libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); cache.registerJob(jobId1); cache.registerJob(jobId2); @@ -227,7 +227,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger { bc.close(); - libCache = new BlobLibraryCacheManager(cache); + libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); cache.registerJob(jobId); assertEquals(0, libCache.getNumberOfManagedJobs()); @@ -341,7 +341,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger { bc.close(); - libCache = new BlobLibraryCacheManager(cache); + libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); cache.registerJob(jobId); assertEquals(0, libCache.getNumberOfManagedJobs()); @@ -448,7 +448,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger { BlobKey dataKey2 = uploader.put(jobId, new byte[]{11, 12, 13, 14, 15, 16, 17, 18}); uploader.close(); - libCache = new BlobLibraryCacheManager(cache); + libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); assertEquals(0, libCache.getNumberOfManagedJobs()); checkFileCountForJob(2, jobId, server); checkFileCountForJob(0, jobId, cache); http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index e52310e..979b940 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -86,7 +86,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config, blobStoreService); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); - libServer[i] = new BlobLibraryCacheManager(server[i]); + libServer[i] = new BlobLibraryCacheManager(server[i], FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); } // Random data http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 72f3a88..d0af88d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -195,7 +196,7 @@ public class JobManagerHARecoveryTest extends TestLogger { instanceManager, scheduler, blobServer, - new BlobLibraryCacheManager(blobServer), + new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST), archive, new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), timeout, @@ -368,7 +369,7 @@ public class JobManagerHARecoveryTest extends TestLogger { mock(InstanceManager.class), mock(Scheduler.class), blobServer, - new BlobLibraryCacheManager(blobServer), + new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST), ActorRef.noSender(), new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), timeout, http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 64cc13b..0f21b55 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; @@ -112,7 +112,7 @@ public class JobMasterTest extends TestLogger { null, mock(OnCompletionActions.class), testingFatalErrorHandler, - new FlinkUserCodeClassLoader(new URL[0])); + FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader())); CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); @@ -213,7 +213,7 @@ public class JobMasterTest extends TestLogger { null, mock(OnCompletionActions.class), testingFatalErrorHandler, - new FlinkUserCodeClassLoader(new URL[0])); + FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader())); CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index b853b14..5ff6022 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.instance.InstanceManager; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; @@ -189,7 +190,7 @@ public class JobManagerLeaderElectionTest extends TestLogger { new InstanceManager(), new Scheduler(TestingUtils.defaultExecutionContext()), blobServer, - new BlobLibraryCacheManager(blobServer), + new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST), ActorRef.noSender(), new NoRestartStrategy.NoRestartStrategyFactory(), AkkaUtils.getDefaultTimeoutAsFiniteDuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 410b93e..052699a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -121,7 +122,8 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { Time.seconds(10), 1000000, // cleanup interval config, - false); // exit-jvm-on-fatal-error + false, // exit-jvm-on-fatal-error + FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST); final int networkBufNum = 32; // note: the network buffer memory configured here is not actually used below but set http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 229e93d..567f8f1 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ under the License. <module>flink-examples</module> <module>flink-clients</module> <module>flink-tests</module> + <module>flink-end-to-end-tests</module> <module>flink-test-utils-parent</module> <module>flink-libraries</module> <module>flink-scala-shell</module> http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/test-infra/end-to-end-test/test_streaming_classloader.sh ---------------------------------------------------------------------- diff --git a/test-infra/end-to-end-test/test_streaming_classloader.sh b/test-infra/end-to-end-test/test_streaming_classloader.sh new file mode 100755 index 0000000..efbf98e --- /dev/null +++ b/test-infra/end-to-end-test/test_streaming_classloader.sh @@ -0,0 +1,99 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +set -e +set -o pipefail + +# Convert relative path to absolute path +TEST_ROOT=`pwd` +TEST_INFRA_DIR="$0" +TEST_INFRA_DIR=`dirname "$TEST_INFRA_DIR"` +cd $TEST_INFRA_DIR +TEST_INFRA_DIR=`pwd` +cd $TEST_ROOT + +. "$TEST_INFRA_DIR"/common.sh + +TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/target/ClassLoaderTestProgram.jar + +# kill any remaining JobManagers/TaskManagers at the end +trap 'pkill -f "JobManager|TaskManager"' EXIT + +echo "Testing parent-first class loading" + +# retrieve git.remote.origin.url from .version.properties +GIT_REMOTE_URL=`grep "git\.remote\.origin\.url" $TEST_INFRA_DIR/../../flink-runtime/src/main/resources/.version.properties \ + |cut -d'=' -f2 \ + |sed -e 's/\\\:/:/g'` + +# remove any leftover classloader settings +sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml" +echo "classloader.resolve-order: parent-first" >> "$FLINK_DIR/conf/flink-conf.yaml" + +start_cluster + +$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --resolve-order parent-first --output $TEST_DATA_DIR/out/cl_out_pf + +stop_cluster + +# remove classloader settings again +sed -i -e 's/classloader.resolve-order: .*//' $FLINK_DIR/conf/flink-conf.yaml + +OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_pf` +# first field: whether we found the method on TaskManager +# result of getResource(".version.properties"), should be from the parent +# ordered result of getResources(".version.properties"), should have parent first +EXPECTED="NoSuchMethodError:${GIT_REMOTE_URL}:${GIT_REMOTE_URL}hello-there-42" +if [[ "$OUTPUT" != "$EXPECTED" ]]; then + echo "Output from Flink program does not match expected output." + echo -e "EXPECTED: $EXPECTED" + echo -e "ACTUAL: $OUTPUT" + PASS="" +fi + +echo "Testing child-first class loading" + +# remove any leftover classloader settings +sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml" +echo "classloader.resolve-order: child-first" >> "$FLINK_DIR/conf/flink-conf.yaml" + +start_cluster + +$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --resolve-order child-first --output $TEST_DATA_DIR/out/cl_out_cf + +stop_cluster + +# remove classloader settings again +sed -i -e 's/classloader.resolve-order: .*//' $FLINK_DIR/conf/flink-conf.yaml + +OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_cf` +# first field: whether we found the method on TaskManager +# result of getResource(".version.properties"), should be from the child +# ordered result of getResources(".version.properties"), should be child first +EXPECTED="Hello, World!:hello-there-42:hello-there-42${GIT_REMOTE_URL}" +if [[ "$OUTPUT" != "$EXPECTED" ]]; then + echo "Output from Flink program does not match expected output." + echo -e "EXPECTED: $EXPECTED" + echo -e "ACTUAL: $OUTPUT" + PASS="" +fi + +clean_data_dir +check_all_pass http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/tools/travis_mvn_watchdog.sh ---------------------------------------------------------------------- diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh index 5232c64..77bad24 100755 --- a/tools/travis_mvn_watchdog.sh +++ b/tools/travis_mvn_watchdog.sh @@ -423,6 +423,12 @@ case $TEST in printf "==============================================================================\n" test-infra/end-to-end-test/test_streaming_kafka010.sh build-target cluster EXIT_CODE=$(($EXIT_CODE+$?)) + + printf "\n==============================================================================\n" + printf "Running class loading end-to-end test\n" + printf "==============================================================================\n" + test-infra/end-to-end-test/test_streaming_classloader.sh build-target cluster + EXIT_CODE=$(($EXIT_CODE+$?)) else printf "\n==============================================================================\n" printf "Previous build failure detected, skipping end-to-end tests.\n"
