Repository: flink
Updated Branches:
  refs/heads/master 2a4ac6600 -> a86b64686


[FLINK-7442] Add option for using a child-first classloader for loading user 
code

This also adds an end-to-end test that verifies correct order for both
classes and resources.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a86b6468
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a86b6468
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a86b6468

Branch: refs/heads/master
Commit: a86b6468617f7078c375f6484975b59477aab193
Parents: 2a4ac66
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Aug 14 14:53:14 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Sep 21 15:01:48 2017 +0200

----------------------------------------------------------------------
 docs/ops/config.md                              |   5 +-
 .../flink/client/program/JobWithJars.java       |   4 +-
 .../state/RocksDbMultiClassLoaderTest.java      |  35 +---
 .../apache/flink/configuration/CoreOptions.java |   8 +
 flink-end-to-end-tests/pom.xml                  | 107 ++++++++++++
 .../flink/runtime/taskmanager/TaskManager.java  |  30 ++++
 .../streaming/tests/ClassLoaderTestProgram.java | 102 +++++++++++
 .../src/main/resources/.version.properties      |   1 +
 .../apache/flink/runtime/client/JobClient.java  |   4 +-
 .../librarycache/BlobLibraryCacheManager.java   |  25 ++-
 .../librarycache/FlinkUserCodeClassLoader.java  |  35 ----
 .../librarycache/FlinkUserCodeClassLoaders.java | 170 +++++++++++++++++++
 .../runtime/jobmaster/JobManagerServices.java   |   8 +-
 .../runtime/taskexecutor/TaskExecutor.java      |   3 +-
 .../taskexecutor/TaskManagerConfiguration.java  |  19 ++-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   8 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   2 +-
 .../BlobLibraryCacheManagerTest.java            |   8 +-
 .../BlobLibraryCacheRecoveryITCase.java         |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   5 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  |   6 +-
 .../JobManagerLeaderElectionTest.java           |   3 +-
 ...askManagerComponentsStartupShutdownTest.java |   4 +-
 pom.xml                                         |   1 +
 .../test_streaming_classloader.sh               |  99 +++++++++++
 tools/travis_mvn_watchdog.sh                    |   6 +
 27 files changed, 607 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index e0b9d4d..9d2405e 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -74,6 +74,9 @@ without explicit scheme definition, such as 
`/user/USERNAME/in.txt`, is going to
 
 - `fs.hdfs.hadoopconf`: The absolute path to the Hadoop File System's (HDFS) 
configuration **directory** (OPTIONAL VALUE). Specifying this value allows 
programs to reference HDFS files using short URIs (`hdfs:///path/to/files`, 
without including the address and port of the NameNode in the file URI). 
Without this option, HDFS files can be accessed, but require fully qualified 
URIs like `hdfs://address:port/path/to/files`. This option also causes file 
writers to pick up the HDFS's default values for block sizes and replication 
factors. Flink will look for the "core-site.xml" and "hdfs-site.xml" files in 
the specified directory.
 
+- `classloader.resolve-order`: Whether Flink should use a child-first 
`ClassLoader` when loading
+user-code classes or a parent-first `ClassLoader`. Can be one of 
`parent-first` or `child-first`. (default: `child-first`)
+
 ## Advanced Options
 
 ### Compute
@@ -186,7 +189,7 @@ will be used under the directory specified by 
jobmanager.web.tmpdir.
 
 - `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a 
Flink supported filesystem. Note: State backend must be accessible from the 
JobManager, use `file://` only for local setups.
 
-- `state.backend.rocksdb.checkpointdir`:  The local directory for storing 
RocksDB files, or a list of directories separated by the systems directory 
delimiter (for example ‘:’ (colon) on Linux/Unix). (DEFAULT value is 
`taskmanager.tmp.dirs`)
+- `state.backend.rocksdb.checkpointdir`:  The local directory for storing 
RocksDB files, or a list of directories separated by the systems directory 
delimiter (for example ':' (colon) on Linux/Unix). (DEFAULT value is 
`taskmanager.tmp.dirs`)
 
 - `state.checkpoints.dir`: The target directory for meta data of [externalized 
checkpoints]({{ site.baseurl 
}}/ops/state/checkpoints.html#externalized-checkpoints).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index ae94ece..768de87 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -19,7 +19,7 @@
 package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.Plan;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 
 import java.io.File;
 import java.io.IOException;
@@ -133,6 +133,6 @@ public class JobWithJars {
                for (int i = 0; i < classpaths.size(); i++) {
                        urls[i + jars.size()] = classpaths.get(i);
                }
-               return new FlinkUserCodeClassLoader(urls, parent);
+               return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
index 4ec6532..1dbc05e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -25,7 +27,6 @@ import org.rocksdb.RocksDB;
 
 import java.lang.reflect.Method;
 import java.net.URL;
-import java.net.URLClassLoader;
 
 import static org.junit.Assert.assertNotEquals;
 
@@ -46,8 +47,8 @@ public class RocksDbMultiClassLoaderTest {
                final URL codePath2 = 
RocksDB.class.getProtectionDomain().getCodeSource().getLocation();
 
                final ClassLoader parent = getClass().getClassLoader();
-               final ClassLoader loader1 = new ChildFirstClassLoader(new URL[] 
{ codePath1, codePath2 }, parent);
-               final ClassLoader loader2 = new ChildFirstClassLoader(new URL[] 
{ codePath1, codePath2 }, parent);
+               final ClassLoader loader1 = 
FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, 
parent);
+               final ClassLoader loader2 = 
FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, 
parent);
 
                final String className = RocksDBStateBackend.class.getName();
 
@@ -69,32 +70,4 @@ public class RocksDbMultiClassLoaderTest {
                meth1.invoke(instance1, tempDir);
                meth2.invoke(instance2, tempDir);
        }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * A variant of the URLClassLoader that first loads from the URLs and 
only after that from the parent.
-        */
-       private static final class ChildFirstClassLoader extends URLClassLoader 
{
-
-               private final ClassLoader parent;
-
-               public ChildFirstClassLoader(URL[] urls, ClassLoader parent) {
-                       super(urls, null);
-                       this.parent = parent;
-               }
-
-               @Override
-               public Class<?> findClass(String name) throws 
ClassNotFoundException {
-                       // first try to load from the URLs
-                       // because the URLClassLoader's parent is null, this 
cannot implicitly load from the parent
-                       try {
-                               return super.findClass(name);
-                       }
-                       catch (ClassNotFoundException e) {
-                               // not in the URL, check the parent
-                               return parent.loadClass(name);
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 80d610a..d1005c4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -27,6 +27,14 @@ public class CoreOptions {
        //  process parameters
        // 
------------------------------------------------------------------------
 
+       public static final ConfigOption<String> CLASSLOADER_RESOLVE_ORDER = 
ConfigOptions
+               .key("classloader.resolve-order")
+               .defaultValue("child-first");
+
+       // 
------------------------------------------------------------------------
+       //  process parameters
+       // 
------------------------------------------------------------------------
+
        public static final ConfigOption<String> FLINK_JVM_OPTIONS = 
ConfigOptions
                .key("env.java.opts")
                .defaultValue("");

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
new file mode 100644
index 0000000..db05c36
--- /dev/null
+++ b/flink-end-to-end-tests/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>1.4-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-end-to-end-tests_${scala.binary.version}</artifactId>
+       <name>flink-end-to-end-tests</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <version>2.4</version>
+
+                               <executions>
+                                       <!-- ClassLoaderTestProgram -->
+                                       <execution>
+                                               <id>ClassLoaderTestProgram</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<classifier>ClassLoaderTestProgram</classifier>
+
+                                                       <archive>
+                                                               
<manifestEntries>
+                                                                       
<program-class>org.apache.flink.streaming.tests.ClassLoaderTestProgram</program-class>
+                                                               
</manifestEntries>
+                                                       </archive>
+
+                                                       <includes>
+                                                               
<include>org/apache/flink/streaming/tests/ClassLoaderTestProgram.class</include>
+                                                               
<include>org/apache/flink/runtime/taskmanager/TaskManager.class</include>
+                                                               
<include>.version.properties</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!--simplify the name of the testing JARs for referring 
to them in the end-to-end test scripts-->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-antrun-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <execution>
+                                               <id>rename</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>run</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <target>
+                                                               <copy 
file="${project.basedir}/target/flink-end-to-end-tests_${scala.binary.version}-${project.version}-ClassLoaderTestProgram.jar"
 tofile="${project.basedir}/target/ClassLoaderTestProgram.jar" />
+                                                       </target>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-end-to-end-tests/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
 
b/flink-end-to-end-tests/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
new file mode 100644
index 0000000..3626885
--- /dev/null
+++ 
b/flink-end-to-end-tests/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+/**
+ * A {@code Taskmanager} in the same package as the proper Flink {@link 
TaskManager}. We use this
+ * to check whether Flink correctly uses the child-first {@link ClassLoader} 
when configured to do
+ * so.
+ */
+public class TaskManager {
+       public static String getMessage() {
+               return "Hello, World!";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/ClassLoaderTestProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/ClassLoaderTestProgram.java
 
b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/ClassLoaderTestProgram.java
new file mode 100644
index 0000000..1d4ca4c
--- /dev/null
+++ 
b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/ClassLoaderTestProgram.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Enumeration;
+import java.util.Properties;
+
+/**
+ * End-to-end test program for verifying that the {@code 
classloader.resolve-order} setting
+ * is being honored by Flink. We test this by creating a fake {@code 
TaskManager} with a single
+ * method that we call in the same package as the original Flink {@code 
TaskManager} and verify that
+ * we get a {@link NoSuchMethodError} if we're running with {@code 
parent-first} class loading
+ * and that we get the correct result from the method when we're running with 
{@code child-first}
+ * class loading.
+ */
+public class ClassLoaderTestProgram {
+
+       public static void main(String[] args) throws Exception {
+
+               final ParameterTool params = ParameterTool.fromArgs(args);
+
+               final String resolveOrder = params.getRequired("resolve-order");
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               env
+                       .fromElements("Hello")
+                       .map((MapFunction<String, String>) value -> {
+
+                               String gitUrl;
+
+                               try (InputStream propFile = 
ClassLoaderTestProgram.class.getClassLoader().getResourceAsStream(".version.properties"))
 {
+                                       Properties properties = new 
Properties();
+                                       properties.load(propFile);
+                                       gitUrl = 
properties.getProperty("git.remote.origin.url");
+                               }
+
+                               Enumeration<URL> resources = 
ClassLoaderTestProgram.class.getClassLoader().getResources(
+                                       ".version.properties");
+
+                               StringBuilder sortedProperties = new 
StringBuilder();
+                               while (resources.hasMoreElements()) {
+                                       URL url = resources.nextElement();
+                                       try (InputStream in = url.openStream()) 
{
+                                               Properties properties = new 
Properties();
+                                               properties.load(in);
+                                               String orderedGitUrl = 
properties.getProperty("git.remote.origin.url");
+                                               
sortedProperties.append(orderedGitUrl);
+                                       }
+                               }
+
+                               if (resolveOrder.equals("parent-first")) {
+                                       try {
+                                               @SuppressWarnings("unused")
+                                               String ignored = 
TaskManager.getMessage();
+
+                                               throw new RuntimeException(
+                                                       
"TaskManager.getMessage() should not be available with parent-first " +
+                                                               "ClassLoader 
order.");
+
+                                       } catch (NoSuchMethodError e) {
+                                               // expected
+                                       }
+                                       return "NoSuchMethodError:" + gitUrl + 
":" + sortedProperties;
+                               } else if (resolveOrder.equals("child-first")) {
+                                       String message = 
TaskManager.getMessage();
+                                       if (!message.equals("Hello, World!")) {
+                                               throw new 
RuntimeException("Wrong message from fake TaskManager.");
+                                       }
+                                       return message + ":" + gitUrl + ":" + 
sortedProperties;
+                               } else {
+                                       throw new RuntimeException("Unknown 
resolve order: " + resolveOrder);
+                               }
+                       })
+                       .writeAsText(params.getRequired("output"), 
FileSystem.WriteMode.OVERWRITE);
+
+               env.execute("ClassLoader Test Program");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-end-to-end-tests/src/main/resources/.version.properties
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/resources/.version.properties 
b/flink-end-to-end-tests/src/main/resources/.version.properties
new file mode 100644
index 0000000..dc98aea
--- /dev/null
+++ b/flink-end-to-end-tests/src/main/resources/.version.properties
@@ -0,0 +1 @@
+git.remote.origin.url=hello-there-42

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 425461c..a30b711 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -250,7 +250,7 @@ public class JobClient {
                                allURLs[pos++] = url;
                        }
 
-                       return new FlinkUserCodeClassLoader(allURLs, 
JobClient.class.getClassLoader());
+                       return FlinkUserCodeClassLoaders.parentFirst(allURLs, 
JobClient.class.getClassLoader());
                } else {
                        throw new JobRetrievalException(jobID, "Couldn't 
retrieve class loader. Job " + jobID + " not found");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index c8fc4e4..038d10f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.URL;
+import java.net.URLClassLoader;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -63,10 +64,16 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
        /** The blob service to download libraries */
        private final BlobService blobService;
 
+       /** The resolve order to use when creating a {@link ClassLoader}. */
+       private final FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder;
+
        // 
--------------------------------------------------------------------------------------------
 
-       public BlobLibraryCacheManager(BlobService blobService) {
+       public BlobLibraryCacheManager(
+                       BlobService blobService,
+                       FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder) {
                this.blobService = checkNotNull(blobService);
+               this.classLoaderResolveOrder = 
checkNotNull(classLoaderResolveOrder);
        }
 
        @Override
@@ -112,7 +119,7 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
                                        }
 
                                        cacheEntries.put(jobId, new 
LibraryCacheEntry(
-                                               requiredJarFiles, 
requiredClasspaths, urls, task));
+                                               requiredJarFiles, 
requiredClasspaths, urls, task, classLoaderResolveOrder));
                                } catch (Throwable t) {
                                        // rethrow or wrap
                                        ExceptionUtils.tryRethrowIOException(t);
@@ -148,7 +155,7 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
                        // else has already been unregistered
                }
        }
-       
+
        @Override
        public ClassLoader getClassLoader(JobID jobId) {
                checkNotNull(jobId, "The JobId must not be null.");
@@ -204,7 +211,7 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
         */
        private static class LibraryCacheEntry {
 
-               private final FlinkUserCodeClassLoader classLoader;
+               private final URLClassLoader classLoader;
 
                private final Set<ExecutionAttemptID> referenceHolders;
                /**
@@ -242,9 +249,15 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
                                Collection<BlobKey> requiredLibraries,
                                Collection<URL> requiredClasspaths,
                                URL[] libraryURLs,
-                               ExecutionAttemptID initialReference) {
+                               ExecutionAttemptID initialReference,
+                               FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder) {
+
+                       this.classLoader =
+                               FlinkUserCodeClassLoaders.create(
+                                       classLoaderResolveOrder,
+                                       libraryURLs,
+                                       
FlinkUserCodeClassLoaders.class.getClassLoader());
 
-                       this.classLoader = new 
FlinkUserCodeClassLoader(libraryURLs);
                        // NOTE: do not store the class paths, i.e. URLs, into 
a set for performance reasons
                        //       see 
http://findbugs.sourceforge.net/bugDescriptions.html#DMI_COLLECTION_OF_URLS
                        //       -> alternatively, compare their string 
representation

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
deleted file mode 100644
index 015f6c7..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.execution.librarycache;
-
-import java.net.URL;
-import java.net.URLClassLoader;
-
-/**
- * Gives the URLClassLoader a nicer name for debugging purposes.
- */
-public class FlinkUserCodeClassLoader extends URLClassLoader {
-
-       public FlinkUserCodeClassLoader(URL[] urls) {
-               this(urls, FlinkUserCodeClassLoader.class.getClassLoader());
-       }
-
-       public FlinkUserCodeClassLoader(URL[] urls, ClassLoader parent) {
-               super(urls, parent);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
new file mode 100644
index 0000000..ef36c36
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Gives the URLClassLoader a nicer name for debugging purposes.
+ */
+public class FlinkUserCodeClassLoaders {
+
+       public static URLClassLoader parentFirst(URL[] urls) {
+               return new ParentFirstClassLoader(urls);
+       }
+
+       public static URLClassLoader parentFirst(URL[] urls, ClassLoader 
parent) {
+               return new ParentFirstClassLoader(urls, parent);
+       }
+
+       public static URLClassLoader childFirst(URL[] urls, ClassLoader parent) 
{
+               return new ChildFirstClassLoader(urls, parent);
+       }
+
+       public static URLClassLoader create(
+               ResolveOrder resolveOrder, URL[] urls, ClassLoader parent) {
+
+               switch (resolveOrder) {
+                       case CHILD_FIRST:
+                               return childFirst(urls, parent);
+                       case PARENT_FIRST:
+                               return parentFirst(urls, parent);
+                       default:
+                               throw new IllegalArgumentException("Unkown 
class resolution order: " + resolveOrder);
+               }
+       }
+
+       /**
+        * Class resolution order for Flink URL {@link ClassLoader}.
+        */
+       public enum ResolveOrder {
+               CHILD_FIRST, PARENT_FIRST;
+
+               public static ResolveOrder fromString(String resolveOrder) {
+                       if (resolveOrder.equalsIgnoreCase("parent-first")) {
+                               return PARENT_FIRST;
+                       } else if 
(resolveOrder.equalsIgnoreCase("child-first")) {
+                               return CHILD_FIRST;
+                       } else {
+                               throw new IllegalArgumentException("Unknown 
resolve order: " + resolveOrder);
+                       }
+               }
+       }
+
+       /**
+        * Regular URLClassLoader that first loads from the parent and only 
after that form the URLs.
+        */
+       static class ParentFirstClassLoader extends URLClassLoader {
+
+               ParentFirstClassLoader(URL[] urls) {
+                       this(urls, 
FlinkUserCodeClassLoaders.class.getClassLoader());
+               }
+
+               ParentFirstClassLoader(URL[] urls, ClassLoader parent) {
+                       super(urls, parent);
+               }
+       }
+
+       /**
+        * A variant of the URLClassLoader that first loads from the URLs and 
only after that from the parent.
+        *
+        * <p>{@link #getResourceAsStream(String)} uses {@link 
#getResource(String)} internally so we
+        * don't override that.
+        */
+       static final class ChildFirstClassLoader extends URLClassLoader {
+
+               public ChildFirstClassLoader(URL[] urls, ClassLoader parent) {
+                       super(urls, parent);
+               }
+
+               @Override
+               protected synchronized Class<?> loadClass(
+                       String name, boolean resolve) throws 
ClassNotFoundException {
+
+                       // First, check if the class has already been loaded
+                       Class<?> c = findLoadedClass(name);
+
+                       if (c == null) {
+                               try {
+                                       // check the URLs
+                                       c = findClass(name);
+                               } catch (ClassNotFoundException e) {
+                                       // let URLClassLoader do it, which will 
eventually call the parent
+                                       c = super.loadClass(name, resolve);
+                               }
+                       }
+
+                       if (resolve) {
+                               resolveClass(c);
+                       }
+
+                       return c;
+               }
+
+               @Override
+               public URL getResource(String name) {
+                       // first, try and find it via the URLClassloader
+                       URL urlClassLoaderResource = findResource(name);
+
+                       if (urlClassLoaderResource != null) {
+                               return urlClassLoaderResource;
+                       }
+
+                       // delegate to super
+                       return super.getResource(name);
+               }
+
+               @Override
+               public Enumeration<URL> getResources(String name) throws 
IOException {
+                       // first get resources from URLClassloader
+                       Enumeration<URL> urlClassLoaderResources = 
findResources(name);
+
+                       final List<URL> result = new ArrayList<>();
+
+                       while (urlClassLoaderResources.hasMoreElements()) {
+                               
result.add(urlClassLoaderResources.nextElement());
+                       }
+
+                       // get parent urls
+                       Enumeration<URL> parentResources = 
getParent().getResources(name);
+
+                       while (parentResources.hasMoreElements()) {
+                               result.add(parentResources.nextElement());
+                       }
+
+                       return new Enumeration<URL>() {
+                               Iterator<URL> iter = result.iterator();
+
+                               public boolean hasMoreElements() {
+                                       return iter.hasNext();
+                               }
+
+                               public URL nextElement() {
+                                       return iter.next();
+                               }
+                       };
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index 57aeaff..f7daabb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -20,10 +20,12 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
@@ -111,7 +113,11 @@ public class JobManagerServices {
                Preconditions.checkNotNull(config);
                Preconditions.checkNotNull(blobServer);
 
-               final BlobLibraryCacheManager libraryCacheManager = new 
BlobLibraryCacheManager(blobServer);
+               final String classLoaderResolveOrder =
+                       config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+
+               final BlobLibraryCacheManager libraryCacheManager =
+                       new BlobLibraryCacheManager(blobServer, 
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder));
 
                final FiniteDuration timeout;
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index b6a0637..859ffbb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -941,7 +941,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                blobServerAddress,
                                taskManagerConfiguration.getConfiguration(),
                                haServices.createBlobStore());
-                       libraryCacheManager = new 
BlobLibraryCacheManager(blobCache);
+                       libraryCacheManager =
+                               new BlobLibraryCacheManager(blobCache, 
taskManagerConfiguration.getClassLoaderResolveOrder());
                } catch (IOException e) {
                        // Can't pass the IOException up - we need a 
RuntimeException anyway
                        // two levels up where this is run asynchronously. 
Also, we don't

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 7c7693b..60dd643 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -23,9 +23,11 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Preconditions;
 
@@ -58,6 +60,8 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
 
        private final boolean exitJvmOnOutOfMemory;
 
+       private final FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder;
+
        public TaskManagerConfiguration(
                int numberSlots,
                String[] tmpDirectories,
@@ -68,7 +72,8 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                Time refusedRegistrationPause,
                long cleanupInterval,
                Configuration configuration,
-               boolean exitJvmOnOutOfMemory) {
+               boolean exitJvmOnOutOfMemory,
+               FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) 
{
 
                this.numberSlots = numberSlots;
                this.tmpDirectories = 
Preconditions.checkNotNull(tmpDirectories);
@@ -79,6 +84,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                this.refusedRegistrationPause = 
Preconditions.checkNotNull(refusedRegistrationPause);
                this.configuration = new 
UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
                this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
+               this.classLoaderResolveOrder = classLoaderResolveOrder;
        }
 
        public int getNumberSlots() {
@@ -120,6 +126,10 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                return exitJvmOnOutOfMemory;
        }
 
+       public FlinkUserCodeClassLoaders.ResolveOrder 
getClassLoaderResolveOrder() {
+               return classLoaderResolveOrder;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Static factory methods
        // 
--------------------------------------------------------------------------------------------
@@ -212,6 +222,10 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
 
                final boolean exitOnOom = 
configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);
 
+               final String classLoaderResolveOrder =
+                       
configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+
+
                return new TaskManagerConfiguration(
                        numberSlots,
                        tmpDirPaths,
@@ -222,6 +236,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                        refusedRegistrationPause,
                        cleanupInterval,
                        configuration,
-                       exitOnOom);
+                       exitOnOom,
+                       
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 276e0ff..67ffb32 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -47,6 +47,7 @@ import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => 
FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
 import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, 
LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
@@ -2467,6 +2468,8 @@ object JobManager {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
+    val classLoaderResolveOrder = 
configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER)
+
     val restartStrategy = 
RestartStrategyFactory.createRestartStrategyFactory(configuration)
 
     val archiveCount = configuration.getInteger(WebOptions.ARCHIVE_COUNT)
@@ -2497,7 +2500,8 @@ object JobManager {
       blobServer = new BlobServer(configuration, blobStore)
       instanceManager = new InstanceManager()
       scheduler = new 
FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
-      libraryCacheManager = new BlobLibraryCacheManager(blobServer)
+      libraryCacheManager =
+        new BlobLibraryCacheManager(blobServer, 
ResolveOrder.fromString(classLoaderResolveOrder))
 
       instanceManager.addInstanceListener(scheduler)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 7f4308e..cedf607 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.minicluster
 
-import java.net.URL
+import java.net.{URL, URLClassLoader}
 import java.util.UUID
 import java.util.concurrent.{Executors, TimeUnit}
 
@@ -33,7 +33,7 @@ import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
 import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => 
FlinkExecutors}
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, 
HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
@@ -668,7 +668,7 @@ abstract class FlinkMiniCluster(
   private def createUserCodeClassLoader(
       jars: java.util.List[Path],
       classPaths: java.util.List[URL],
-      parentClassLoader: ClassLoader): FlinkUserCodeClassLoader = {
+      parentClassLoader: ClassLoader): URLClassLoader = {
 
     val urls = new Array[URL](jars.size() + classPaths.size())
 
@@ -686,6 +686,6 @@ abstract class FlinkMiniCluster(
       counter += 1
     }
 
-    new FlinkUserCodeClassLoader(urls, parentClassLoader)
+    FlinkUserCodeClassLoaders.parentFirst(urls, parentClassLoader)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 431adb6..94f375a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -970,7 +970,7 @@ class TaskManager(
           highAvailabilityServices.createBlobStore())
         blobCache = Option(blobcache)
         libraryCacheManager = Some(
-          new BlobLibraryCacheManager(blobcache))
+          new BlobLibraryCacheManager(blobcache, 
config.getClassLoaderResolveOrder()))
       }
       catch {
         case e: Exception =>

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index a4b48e8..3241b86 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -95,7 +95,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 
                        bc.close();
 
-                       libCache = new BlobLibraryCacheManager(cache);
+                       libCache = new BlobLibraryCacheManager(cache, 
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
                        cache.registerJob(jobId1);
                        cache.registerJob(jobId2);
 
@@ -227,7 +227,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger 
{
 
                        bc.close();
 
-                       libCache = new BlobLibraryCacheManager(cache);
+                       libCache = new BlobLibraryCacheManager(cache, 
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
                        cache.registerJob(jobId);
 
                        assertEquals(0, libCache.getNumberOfManagedJobs());
@@ -341,7 +341,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger 
{
 
                        bc.close();
 
-                       libCache = new BlobLibraryCacheManager(cache);
+                       libCache = new BlobLibraryCacheManager(cache, 
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
                        cache.registerJob(jobId);
 
                        assertEquals(0, libCache.getNumberOfManagedJobs());
@@ -448,7 +448,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger 
{
                        BlobKey dataKey2 = uploader.put(jobId, new byte[]{11, 
12, 13, 14, 15, 16, 17, 18});
                        uploader.close();
 
-                       libCache = new BlobLibraryCacheManager(cache);
+                       libCache = new BlobLibraryCacheManager(cache, 
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
                        assertEquals(0, libCache.getNumberOfManagedJobs());
                        checkFileCountForJob(2, jobId, server);
                        checkFileCountForJob(0, jobId, cache);

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index e52310e..979b940 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -86,7 +86,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        for (int i = 0; i < server.length; i++) {
                                server[i] = new BlobServer(config, 
blobStoreService);
                                serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
-                               libServer[i] = new 
BlobLibraryCacheManager(server[i]);
+                               libServer[i] = new 
BlobLibraryCacheManager(server[i], 
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
                        }
 
                        // Random data

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 72f3a88..d0af88d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -195,7 +196,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                instanceManager,
                                scheduler,
                                blobServer,
-                               new BlobLibraryCacheManager(blobServer),
+                               new BlobLibraryCacheManager(blobServer, 
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
                                archive,
                                new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
                                timeout,
@@ -368,7 +369,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                mock(InstanceManager.class),
                                mock(Scheduler.class),
                                blobServer,
-                               new BlobLibraryCacheManager(blobServer),
+                               new BlobLibraryCacheManager(blobServer, 
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
                                ActorRef.noSender(),
                                new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
                                timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 64cc13b..0f21b55 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
-import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -112,7 +112,7 @@ public class JobMasterTest extends TestLogger {
                                null,
                                mock(OnCompletionActions.class),
                                testingFatalErrorHandler,
-                               new FlinkUserCodeClassLoader(new URL[0]));
+                               FlinkUserCodeClassLoaders.parentFirst(new 
URL[0], JobMasterTest.class.getClassLoader()));
 
                        CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 
@@ -213,7 +213,7 @@ public class JobMasterTest extends TestLogger {
                                null,
                                mock(OnCompletionActions.class),
                                testingFatalErrorHandler,
-                               new FlinkUserCodeClassLoader(new URL[0]));
+                               FlinkUserCodeClassLoaders.parentFirst(new 
URL[0], JobMasterTest.class.getClassLoader()));
 
                        CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index b853b14..5ff6022 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.InstanceManager;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -189,7 +190,7 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                        new InstanceManager(),
                        new Scheduler(TestingUtils.defaultExecutionContext()),
                        blobServer,
-                       new BlobLibraryCacheManager(blobServer),
+                       new BlobLibraryCacheManager(blobServer, 
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
                        ActorRef.noSender(),
                        new NoRestartStrategy.NoRestartStrategyFactory(),
                        AkkaUtils.getDefaultTimeoutAsFiniteDuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 410b93e..052699a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -121,7 +122,8 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                                Time.seconds(10),
                                1000000, // cleanup interval
                                config,
-                               false); // exit-jvm-on-fatal-error
+                               false, // exit-jvm-on-fatal-error
+                               
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
 
                        final int networkBufNum = 32;
                        // note: the network buffer memory configured here is 
not actually used below but set

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 229e93d..567f8f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@ under the License.
                <module>flink-examples</module>
                <module>flink-clients</module>
                <module>flink-tests</module>
+               <module>flink-end-to-end-tests</module>
                <module>flink-test-utils-parent</module>
                <module>flink-libraries</module>
                <module>flink-scala-shell</module>

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/test-infra/end-to-end-test/test_streaming_classloader.sh
----------------------------------------------------------------------
diff --git a/test-infra/end-to-end-test/test_streaming_classloader.sh 
b/test-infra/end-to-end-test/test_streaming_classloader.sh
new file mode 100755
index 0000000..efbf98e
--- /dev/null
+++ b/test-infra/end-to-end-test/test_streaming_classloader.sh
@@ -0,0 +1,99 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+set -e
+set -o pipefail
+
+# Convert relative path to absolute path
+TEST_ROOT=`pwd`
+TEST_INFRA_DIR="$0"
+TEST_INFRA_DIR=`dirname "$TEST_INFRA_DIR"`
+cd $TEST_INFRA_DIR
+TEST_INFRA_DIR=`pwd`
+cd $TEST_ROOT
+
+. "$TEST_INFRA_DIR"/common.sh
+
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/target/ClassLoaderTestProgram.jar
+
+# kill any remaining JobManagers/TaskManagers at the end
+trap 'pkill -f "JobManager|TaskManager"' EXIT
+
+echo "Testing parent-first class loading"
+
+# retrieve git.remote.origin.url from .version.properties
+GIT_REMOTE_URL=`grep "git\.remote\.origin\.url" 
$TEST_INFRA_DIR/../../flink-runtime/src/main/resources/.version.properties \
+  |cut -d'=' -f2 \
+  |sed -e 's/\\\:/:/g'`
+
+# remove any leftover classloader settings
+sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
+echo "classloader.resolve-order: parent-first" >> 
"$FLINK_DIR/conf/flink-conf.yaml"
+
+start_cluster
+
+$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --resolve-order parent-first 
--output $TEST_DATA_DIR/out/cl_out_pf
+
+stop_cluster
+
+# remove classloader settings again
+sed -i -e 's/classloader.resolve-order: .*//' $FLINK_DIR/conf/flink-conf.yaml
+
+OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_pf`
+# first field: whether we found the method on TaskManager
+# result of getResource(".version.properties"), should be from the parent
+# ordered result of getResources(".version.properties"), should have parent 
first
+EXPECTED="NoSuchMethodError:${GIT_REMOTE_URL}:${GIT_REMOTE_URL}hello-there-42"
+if [[ "$OUTPUT" != "$EXPECTED" ]]; then
+  echo "Output from Flink program does not match expected output."
+  echo -e "EXPECTED: $EXPECTED"
+  echo -e "ACTUAL: $OUTPUT"
+  PASS=""
+fi
+
+echo "Testing child-first class loading"
+
+# remove any leftover classloader settings
+sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
+echo "classloader.resolve-order: child-first" >> 
"$FLINK_DIR/conf/flink-conf.yaml"
+
+start_cluster
+
+$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --resolve-order child-first 
--output $TEST_DATA_DIR/out/cl_out_cf
+
+stop_cluster
+
+# remove classloader settings again
+sed -i -e 's/classloader.resolve-order: .*//' $FLINK_DIR/conf/flink-conf.yaml
+
+OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_cf`
+# first field: whether we found the method on TaskManager
+# result of getResource(".version.properties"), should be from the child
+# ordered result of getResources(".version.properties"), should be child first
+EXPECTED="Hello, World!:hello-there-42:hello-there-42${GIT_REMOTE_URL}"
+if [[ "$OUTPUT" != "$EXPECTED" ]]; then
+  echo "Output from Flink program does not match expected output."
+  echo -e "EXPECTED: $EXPECTED"
+  echo -e "ACTUAL: $OUTPUT"
+  PASS=""
+fi
+
+clean_data_dir
+check_all_pass

http://git-wip-us.apache.org/repos/asf/flink/blob/a86b6468/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 5232c64..77bad24 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -423,6 +423,12 @@ case $TEST in
                        printf 
"==============================================================================\n"
                        test-infra/end-to-end-test/test_streaming_kafka010.sh 
build-target cluster
                        EXIT_CODE=$(($EXIT_CODE+$?))
+
+                       printf 
"\n==============================================================================\n"
+                       printf "Running class loading end-to-end test\n"
+                       printf 
"==============================================================================\n"
+                       
test-infra/end-to-end-test/test_streaming_classloader.sh build-target cluster
+                       EXIT_CODE=$(($EXIT_CODE+$?))
                else
                        printf 
"\n==============================================================================\n"
                        printf "Previous build failure detected, skipping 
end-to-end tests.\n"

Reply via email to