Repository: incubator-apex-core Updated Branches: refs/heads/master 20607b59a -> 37e4c74ec
APEXCORE-304 Add support for external jars to be added to DAG as dependency. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f71bb0ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f71bb0ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f71bb0ed Branch: refs/heads/master Commit: f71bb0edfd5e10fdf55343a151e0a41e2c4cba7a Parents: c2008f2 Author: chinmaykolhatkar <chin...@datatorrent.com> Authored: Thu Apr 21 15:39:49 2016 +0530 Committer: chinmaykolhatkar <chin...@datatorrent.com> Committed: Fri Apr 29 10:48:24 2016 +0530 ---------------------------------------------------------------------- .../main/java/com/datatorrent/api/Context.java | 8 ++ .../stram/LaunchContainerRunnable.java | 2 +- .../com/datatorrent/stram/LocalModeImpl.java | 29 +++++ .../java/com/datatorrent/stram/StramClient.java | 5 +- .../stram/StreamingContainerManager.java | 2 +- .../stram/client/StramAppLauncher.java | 18 ++- .../stram/plan/logical/LogicalPlan.java | 11 +- .../stram/StramLocalClusterTest.java | 122 +++++++++++++++++++ engine/src/test/resources/dynamicJar/POJO.java | 49 ++++++++ 9 files changed, 231 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 7e19a56..a0f3ad3 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -519,6 +519,14 @@ public interface Context */ Attribute<AffinityRulesSet> AFFINITY_RULES_SET = new Attribute<AffinityRulesSet>(new JsonStringCodec<AffinityRulesSet>(AffinityRulesSet.class)); + /** + * Comma separated list of jar file dependencies to be deployed with the application. + * The launcher will combine the list with built-in dependencies and those specified + * that are made available through the distributed file system to application master + * and child containers. + */ + Attribute<String> LIBRARY_JARS = new Attribute<>(new StringCodec.String2String()); + @SuppressWarnings(value = "FieldNameHidesFieldInSuperclass") long serialVersionUID = AttributeMap.AttributeInitializer.initialize(DAGContext.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java index 8a8db79..e9dd72b 100644 --- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java +++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java @@ -168,7 +168,7 @@ public class LaunchContainerRunnable implements Runnable try { // child VM dependencies try (FileSystem fs = StramClientUtils.newFileSystemInstance(nmClient.getConfig())) { - addFilesToLocalResources(LocalResourceType.FILE, dag.getAttributes().get(LogicalPlan.LIBRARY_JARS), localResources, fs); + addFilesToLocalResources(LocalResourceType.FILE, dag.getAttributes().get(Context.DAGContext.LIBRARY_JARS), localResources, fs); String archives = dag.getAttributes().get(LogicalPlan.ARCHIVES); if (archives != null) { addFilesToLocalResources(LocalResourceType.ARCHIVE, archives, localResources, fs); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java b/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java index e51af6b..3fedc7c 100644 --- a/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java +++ b/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java @@ -18,8 +18,14 @@ */ package com.datatorrent.stram; +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; + import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.LocalMode; import com.datatorrent.api.StreamingApplication; @@ -66,9 +72,32 @@ public class LocalModeImpl extends LocalMode public Controller getController() { try { + addLibraryJarsToClasspath(lp); return new StramLocalCluster(lp); } catch (Exception e) { throw new RuntimeException("Error creating local cluster", e); } } + + private void addLibraryJarsToClasspath(LogicalPlan lp) throws MalformedURLException + { + String libJarsCsv = lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS); + + if (libJarsCsv != null && libJarsCsv.length() != 0) { + String[] split = libJarsCsv.split(StramClient.LIB_JARS_SEP); + if (split.length != 0) { + URL[] urlList = new URL[split.length]; + for (int i = 0; i < split.length; i++) { + File file = new File(split[i]); + urlList[i] = file.toURI().toURL(); + } + + // Set class loader. + ClassLoader prevCl = Thread.currentThread().getContextClassLoader(); + URLClassLoader cl = URLClassLoader.newInstance(urlList, prevCl); + Thread.currentThread().setContextClassLoader(cl); + } + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/StramClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index c1dfffd..daca67e 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -74,6 +74,7 @@ import org.apache.log4j.DTLoggerFactory; import com.google.common.base.Objects; import com.google.common.collect.Lists; +import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.StorageAgent; import com.datatorrent.common.util.AsyncFSStorageAgent; @@ -223,7 +224,7 @@ public class StramClient localJarFiles.add(jar); } - String libJarsPath = dag.getValue(LogicalPlan.LIBRARY_JARS); + String libJarsPath = dag.getValue(Context.DAGContext.LIBRARY_JARS); if (!StringUtils.isEmpty(libJarsPath)) { String[] libJars = StringUtils.splitByWholeSeparator(libJarsPath, LIB_JARS_SEP); localJarFiles.addAll(Arrays.asList(libJars)); @@ -442,7 +443,7 @@ public class StramClient String libJarsCsv = copyFromLocal(fs, appPath, localJarFiles.toArray(new String[]{})); LOG.info("libjars: {}", libJarsCsv); - dag.getAttributes().put(LogicalPlan.LIBRARY_JARS, libJarsCsv); + dag.getAttributes().put(Context.DAGContext.LIBRARY_JARS, libJarsCsv); LaunchContainerRunnable.addFilesToLocalResources(LocalResourceType.FILE, libJarsCsv, localResources, fs); if (archives != null) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 6a8ee9c..37f63b2 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -3131,7 +3131,7 @@ public class StreamingContainerManager implements PlanContext lp.setAttribute(LogicalPlan.APPLICATION_ID, appId); lp.setAttribute(LogicalPlan.APPLICATION_PATH, newApp.assertAppPath()); - lp.setAttribute(LogicalPlan.LIBRARY_JARS, newApp.getValue(LogicalPlan.LIBRARY_JARS)); + lp.setAttribute(Context.DAGContext.LIBRARY_JARS, newApp.getValue(Context.DAGContext.LIBRARY_JARS)); lp.setAttribute(LogicalPlan.ARCHIVES, newApp.getValue(LogicalPlan.ARCHIVES)); this.finals = new FinalVars(finals, lp); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java index a1197a5..bd58e35 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java @@ -285,7 +285,7 @@ public class StramAppLauncher // don't rely on object deserialization for changing the app id in the future. try { JSONObject attributes = metaJson.getJSONObject("attributes"); - originalLibJars = attributes.getString(LogicalPlan.LIBRARY_JARS.getSimpleName()); + originalLibJars = attributes.getString(Context.DAGContext.LIBRARY_JARS.getSimpleName()); recoveryAppName = attributes.getString(Context.DAGContext.APPLICATION_NAME.getSimpleName()); } catch (JSONException ex) { recoveryAppName = "Recovery App From " + originalAppId; @@ -532,10 +532,22 @@ public class StramAppLauncher */ public void runLocal(AppFactory appConfig) throws Exception { + propertiesBuilder.conf.setEnum(StreamingApplication.ENVIRONMENT, StreamingApplication.Environment.LOCAL); + LogicalPlan lp = appConfig.createApp(propertiesBuilder); + + String libJarsCsv = lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS); + if (libJarsCsv != null && libJarsCsv.length() != 0) { + String[] split = libJarsCsv.split(StramClient.LIB_JARS_SEP); + for (String jarPath : split) { + File file = new File(jarPath); + URL url = file.toURI().toURL(); + launchDependencies.add(url); + } + } + // local mode requires custom classes to be resolved through the context class loader loadDependencies(); - propertiesBuilder.conf.setEnum(StreamingApplication.ENVIRONMENT, StreamingApplication.Environment.LOCAL); - StramLocalCluster lc = new StramLocalCluster(appConfig.createApp(propertiesBuilder)); + StramLocalCluster lc = new StramLocalCluster(lp); lc.run(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index e3f6987..15969b7 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -61,6 +61,8 @@ import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.commons.io.input.ClassLoaderObjectInputStream; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.apache.hadoop.conf.Configuration; @@ -158,13 +160,6 @@ public class LogicalPlan implements Serializable, DAG public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null, new StringCodec.String2String()); public static Attribute<Double> TOKEN_REFRESH_ANTICIPATORY_FACTOR = new Attribute<>(0.7); /** - * Comma separated list of jar file dependencies to be deployed with the application. - * The launcher will combine the list with built-in dependencies and those specified - * that are made available through the distributed file system to application master - * and child containers. - */ - public static Attribute<String> LIBRARY_JARS = new Attribute<>(new StringCodec.String2String()); - /** * Comma separated list of archives to be deployed with the application. * The launcher will include the archives into the final set of resources * that are made available through the distributed file system to application master @@ -2386,7 +2381,7 @@ public class LogicalPlan implements Serializable, DAG public static LogicalPlan read(InputStream is) throws IOException, ClassNotFoundException { - return (LogicalPlan)new ObjectInputStream(is).readObject(); + return (LogicalPlan)new ClassLoaderObjectInputStream(Thread.currentThread().getContextClassLoader(), is).readObject(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java index e620141..1a5046c 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java @@ -22,8 +22,11 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.io.LineNumberReader; +import java.net.URL; +import java.net.URLClassLoader; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.CountDownLatch; import org.junit.After; import org.junit.Assert; @@ -33,7 +36,16 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; import com.datatorrent.common.util.AsyncFSStorageAgent; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer; import com.datatorrent.stram.StramLocalCluster.MockComponentFactory; import com.datatorrent.stram.api.Checkpoint; @@ -48,6 +60,7 @@ import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.support.ManualScheduledExecutorService; import com.datatorrent.stram.support.StramTestSupport; + public class StramLocalClusterTest { private static final Logger LOG = LoggerFactory.getLogger(StramLocalClusterTest.class); @@ -274,4 +287,113 @@ public class StramLocalClusterTest localCluster.shutdown(); } + @Test + public void testDynamicLoading() throws Exception + { + String generatedJar = generatejar("POJO"); + URLClassLoader uCl = URLClassLoader.newInstance(new URL[] {new File(generatedJar).toURI().toURL()}); + Class<?> pojo = uCl.loadClass("POJO"); + + DynamicLoaderApp app = new DynamicLoaderApp(); + app.generatedJar = generatedJar; + app.pojo = pojo; + + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(app, new Configuration()); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + DynamicLoaderApp.latch.await(); + Assert.assertTrue(DynamicLoaderApp.passed); + lc.shutdown(); + } + + static class DynamicLoaderApp implements StreamingApplication + { + static boolean passed = false; + static CountDownLatch latch = new CountDownLatch(2); + + DynamicLoader test; + String generatedJar; + Class<?> pojo; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TestGeneratorInputOperator input = dag.addOperator("Input", new TestGeneratorInputOperator()); + test = dag.addOperator("Test", new DynamicLoader()); + + dag.addStream("S1", input.outport, test.input); + dag.setAttribute(Context.DAGContext.LIBRARY_JARS, generatedJar); + dag.setInputPortAttribute(test.input, Context.PortContext.TUPLE_CLASS, pojo); + } + } + + static class DynamicLoader extends BaseOperator + { + public final transient DefaultInputPort input = new DefaultInputPort() + { + @Override + public void setup(Context.PortContext context) + { + Class<?> value = context.getValue(Context.PortContext.TUPLE_CLASS); + if (value.getName().equals("POJO")) { + DynamicLoaderApp.passed = true; + } else { + DynamicLoaderApp.passed = false; + } + DynamicLoaderApp.latch.countDown(); + } + + @Override + public void process(Object tuple) + { + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + try { + cl.loadClass("POJO"); + } catch (ClassNotFoundException e) { + DynamicLoaderApp.passed = false; + DynamicLoaderApp.latch.countDown(); + throw new RuntimeException(e); + } + + try { + Class.forName("POJO", true, Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + DynamicLoaderApp.passed = false; + DynamicLoaderApp.latch.countDown(); + throw new RuntimeException(e); + } + + DynamicLoaderApp.passed = true; + DynamicLoaderApp.latch.countDown(); + } + } + + private String generatejar(String pojoClassName) throws IOException, InterruptedException + { + String sourceDir = "src/test/resources/dynamicJar/"; + String destDir = testMeta.getPath(); + + Process p = Runtime.getRuntime() + .exec(new String[] {"javac", "-d", destDir, sourceDir + pojoClassName + ".java"}, null, null); + IOUtils.copy(p.getInputStream(), System.out); + IOUtils.copy(p.getErrorStream(), System.err); + Assert.assertEquals(0, p.waitFor()); + + p = Runtime.getRuntime() + .exec(new String[] {"jar", "-cf", pojoClassName + ".jar", pojoClassName + ".class"}, null, new File(destDir)); + IOUtils.copy(p.getInputStream(), System.out); + IOUtils.copy(p.getErrorStream(), System.err); + Assert.assertEquals(0, p.waitFor()); + + return new File(destDir, pojoClassName + ".jar").getAbsolutePath(); + } + + } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/test/resources/dynamicJar/POJO.java ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/dynamicJar/POJO.java b/engine/src/test/resources/dynamicJar/POJO.java new file mode 100644 index 0000000..8868544 --- /dev/null +++ b/engine/src/test/resources/dynamicJar/POJO.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * This file is not directly used anywhere. This will be compiled and packages as jar by unit test to verify + * dynamic loading. + */ +public class POJO +{ + private int a; + private String b; + + public int getA() + { + return a; + } + + public void setA(int a) + { + this.a = a; + } + + public String getB() + { + return b; + } + + public void setB(String b) + { + this.b = b; + } +} +