This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git
The following commit(s) were added to refs/heads/master by this push: new b3911c3 APEXCORE-767 Set parent classloader in StramAppLauncher loadDependencies b3911c3 is described below commit b3911c3515951685bc47ff77b2bf22dadb18f1b3 Author: Florian Schmidt <florian.schmidt.1...@icloud.com> AuthorDate: Wed Oct 11 16:34:17 2017 -0700 APEXCORE-767 Set parent classloader in StramAppLauncher loadDependencies StramAppLauncher.loadDependencies is called multiple times when starting an application via the apex-cli with the -local option. In each of the calls to loadDependencies, the contextClassLoader of the current thread would be replaced with a new instance of URLClassLoader (which has no parent class loader set). This can lead to issues, e.g. when one aquires the current contextClassLauncher, loads a class with it and tries to cast it to a class which was loaded with a previous version of the contextClassLoader, resulting in a ClassCastException. An example of this bug can be seen in APEXMALHAR-2511 The changes in this commit fix this by passing the parent class loader for each new instance of URLClassLoader to the current contextClassLoader --- .../java/com/datatorrent/stram/cli/ApexCli.java | 9 +- .../com/datatorrent/stram/client/AppPackage.java | 7 +- .../datatorrent/stram/client/StramAppLauncher.java | 34 ++++- .../apache/apex/engine/YarnAppLauncherImpl.java | 1 + .../com/datatorrent/stram/client/AsyncTester.java | 56 +++++++++ .../stram/client/StramAppLauncherTest.java | 137 +++++++++++++++++++++ 6 files changed, 237 insertions(+), 7 deletions(-) diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java index 2451b10..a4ef3f5 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java @@ -2151,7 +2151,7 @@ public class ApexCli } else { System.err.println("No application specified."); } - + submitApp.resetContextClassLoader(); } finally { IOUtils.closeQuietly(cp); } @@ -2911,8 +2911,10 @@ public class ApexCli submitApp.loadDependencies(); List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, appName, commandLineInfo.exactMatch); if (matchingAppFactories == null || matchingAppFactories.isEmpty()) { + submitApp.resetContextClassLoader(); throw new CliException("No application in jar file matches '" + appName + "'"); } else if (matchingAppFactories.size() > 1) { + submitApp.resetContextClassLoader(); throw new CliException("More than one application in jar file match '" + appName + "'"); } else { Map<String, Object> map = new HashMap<>(); @@ -2940,6 +2942,7 @@ public class ApexCli } } printJson(map); + submitApp.resetContextClassLoader(); } } else { if (filename.endsWith(".json")) { @@ -2971,6 +2974,7 @@ public class ApexCli appList.add(m); } printJson(appList, "applications"); + submitApp.resetContextClassLoader(); } } } else { @@ -3200,8 +3204,10 @@ public class ApexCli submitApp.loadDependencies(); List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, appName, true); if (matchingAppFactories == null || matchingAppFactories.isEmpty()) { + submitApp.resetContextClassLoader(); throw new CliException("No application in jar file matches '" + appName + "'"); } else if (matchingAppFactories.size() > 1) { + submitApp.resetContextClassLoader(); throw new CliException("More than one application in jar file match '" + appName + "'"); } else { AppFactory appFactory = matchingAppFactories.get(0); @@ -3211,6 +3217,7 @@ public class ApexCli file.createNewFile(); } LogicalPlanSerializer.convertToProperties(logicalPlan).save(file); + submitApp.resetContextClassLoader(); } } else { if (currentApp == null) { diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java index a606b06..5e03438 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java +++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java @@ -465,8 +465,9 @@ public class AppPackage implements Closeable if (entry.getName().endsWith(".jar") && !skipJars) { appJars.add(entry.getName()); + StramAppLauncher stramAppLauncher = null; try { - StramAppLauncher stramAppLauncher = new StramAppLauncher(entry, config); + stramAppLauncher = new StramAppLauncher(entry, config); stramAppLauncher.loadDependencies(); List<AppFactory> appFactories = stramAppLauncher.getBundledTopologies(); for (AppFactory appFactory : appFactories) { @@ -486,6 +487,10 @@ public class AppPackage implements Closeable } } catch (Exception ex) { LOG.error("Caught exception trying to process {}", entry.getName(), ex); + } finally { + if (stramAppLauncher != null) { + stramAppLauncher.resetContextClassLoader(); + } } } } diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java index d4f0170..2019f48 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java @@ -106,6 +106,9 @@ public class StramAppLauncher private LinkedHashSet<File> deployJars; private final StringWriter mvnBuildClasspathOutput = new StringWriter(); + private ClassLoader initialClassLoader; + private Thread loaderThread; + public interface AppFactory { LogicalPlan createApp(LogicalPlanConfiguration conf); @@ -220,7 +223,6 @@ public class StramAppLauncher } } - public StramAppLauncher(File appJarFile, Configuration conf) throws Exception { this.jarFile = appJarFile; @@ -535,10 +537,32 @@ public class StramAppLauncher public URLClassLoader loadDependencies() { - URLClassLoader cl = URLClassLoader.newInstance(launchDependencies.toArray(new URL[launchDependencies.size()])); - Thread.currentThread().setContextClassLoader(cl); - StringCodecs.check(); - return cl; + if (this.loaderThread == null && this.initialClassLoader == null) { + this.loaderThread = Thread.currentThread(); + this.initialClassLoader = Thread.currentThread().getContextClassLoader(); + } + + if (Thread.currentThread() != this.loaderThread) { + throw new RuntimeException("Calls to loadDependencies can only be made on the same thread that loadDependencies was called on for the first time"); + } else { + URL[] dependencies = launchDependencies.toArray(new URL[launchDependencies.size()]); + + ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader(); + URLClassLoader cl = URLClassLoader.newInstance(dependencies, currentContextClassLoader); + Thread.currentThread().setContextClassLoader(cl); + + StringCodecs.check(); + return cl; + } + } + + public void resetContextClassLoader() + { + if (Thread.currentThread() != this.loaderThread) { + throw new RuntimeException("Calls to resetContextClassLoader can only be made on the same thread that loadDependencies was called on for the first time"); + } + + Thread.currentThread().setContextClassLoader(initialClassLoader); } private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java index 9a69b08..7b43ab5 100644 --- a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java +++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java @@ -80,6 +80,7 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar } }; ApplicationId appId = appLauncher.launchApp(appFactory); + appLauncher.resetContextClassLoader(); return new YarnAppHandleImpl(appId, conf); } catch (Exception ex) { throw new LauncherException(ex); diff --git a/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java b/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java new file mode 100644 index 0000000..e8d692e --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.client; + +// See https://stackoverflow.com/a/2596530 +public class AsyncTester +{ + private Thread thread; + private volatile AssertionError error; + + public AsyncTester(final Runnable runnable) + { + thread = new Thread(new Runnable() + { + @Override + public void run() + { + try { + runnable.run(); + } catch (AssertionError e) { + error = e; + } + } + }); + } + + public AsyncTester start() + { + thread.start(); + return this; + } + + public void test() throws AssertionError, InterruptedException + { + thread.join(); + if (error != null) { + throw error; + } + } +} diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java index 2069bab..e31a6dd 100644 --- a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java +++ b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java @@ -20,6 +20,8 @@ package com.datatorrent.stram.client; import java.io.File; import java.io.IOException; +import java.net.URL; +import java.util.LinkedHashSet; import org.junit.Assert; import org.junit.Rule; @@ -55,6 +57,141 @@ public class StramAppLauncherTest @PrepareForTest({StramAppLauncher.class}) @PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"}) + public static class LoadDependenciesTest + { + + @Rule + public PowerMockRule rule = new PowerMockRule(); + + @Rule + public TestWatcher setup = new TestWatcher() + { + @Override + protected void starting(Description description) + { + super.starting(description); + suppress(method(StramAppLauncher.class, "init")); + } + }; + + @Test + public void testLoadDependenciesSetsParentClassLoader() throws Exception + { + // Setup + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.newInstance(conf); + StramAppLauncher appLauncher = new StramAppLauncher(fs, conf); + + Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>()); + + // Get initial contextClassLoader + ClassLoader initialClassLoader = Thread.currentThread().getContextClassLoader(); + + appLauncher.loadDependencies(); + + // Make sure that new contextClassLoader has initialClassLoader as parent + ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); + + Assert.assertSame(initialClassLoader, currentClassLoader.getParent()); + } + + @Test + public void testResetContextClassLoaderResetsToInitialClassLoader() throws Exception + { + // Setup + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.newInstance(conf); + StramAppLauncher appLauncher = new StramAppLauncher(fs, conf); + + Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>()); + + // Get initial contextClassLoader + ClassLoader initialClassLoader = Thread.currentThread().getContextClassLoader(); + + appLauncher.loadDependencies(); + appLauncher.resetContextClassLoader(); + + ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); + Assert.assertSame(initialClassLoader, currentClassLoader); + } + + @Test + public void testResetContextClassloaderOnlyOnInitialThread() throws Exception + { + // Setup + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.newInstance(conf); + final StramAppLauncher appLauncher = new StramAppLauncher(fs, conf); + Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>()); + + new AsyncTester(new Runnable() + { + @Override + public void run() + { + try { + appLauncher.loadDependencies(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + }).start().test(); + + new AsyncTester(new Runnable() + { + @Override + public void run() + { + try { + appLauncher.resetContextClassLoader(); + Assert.fail("An exception should be thrown"); + } catch (RuntimeException e) { + // catch as expected + } + } + }).start().test(); + } + + @Test + public void testLoadDependenciesOnlyOnInitialThread() throws Exception + { + // Setup + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.newInstance(conf); + final StramAppLauncher appLauncher = new StramAppLauncher(fs, conf); + Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>()); + + new AsyncTester(new Runnable() + { + @Override + public void run() + { + try { + appLauncher.loadDependencies(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + }).start().test(); + + new AsyncTester(new Runnable() + { + @Override + public void run() + { + try { + appLauncher.loadDependencies(); + Assert.fail("An exception should be thrown"); + } catch (RuntimeException e) { + // catch as expected + } + } + }).start().test(); + } + } + + @PrepareForTest({StramAppLauncher.class}) + @PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"}) public static class RefreshTokenTests { File workspace; -- To stop receiving notification emails like this one, please contact ['"commits@apex.apache.org" <commits@apex.apache.org>'].