Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 20607b59a -> 37e4c74ec


APEXCORE-304 Add support for external jars to be added to DAG as dependency.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f71bb0ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f71bb0ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f71bb0ed

Branch: refs/heads/master
Commit: f71bb0edfd5e10fdf55343a151e0a41e2c4cba7a
Parents: c2008f2
Author: chinmaykolhatkar <chin...@datatorrent.com>
Authored: Thu Apr 21 15:39:49 2016 +0530
Committer: chinmaykolhatkar <chin...@datatorrent.com>
Committed: Fri Apr 29 10:48:24 2016 +0530

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Context.java  |   8 ++
 .../stram/LaunchContainerRunnable.java          |   2 +-
 .../com/datatorrent/stram/LocalModeImpl.java    |  29 +++++
 .../java/com/datatorrent/stram/StramClient.java |   5 +-
 .../stram/StreamingContainerManager.java        |   2 +-
 .../stram/client/StramAppLauncher.java          |  18 ++-
 .../stram/plan/logical/LogicalPlan.java         |  11 +-
 .../stram/StramLocalClusterTest.java            | 122 +++++++++++++++++++
 engine/src/test/resources/dynamicJar/POJO.java  |  49 ++++++++
 9 files changed, 231 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java 
b/api/src/main/java/com/datatorrent/api/Context.java
index 7e19a56..a0f3ad3 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -519,6 +519,14 @@ public interface Context
      */
     Attribute<AffinityRulesSet> AFFINITY_RULES_SET = new 
Attribute<AffinityRulesSet>(new 
JsonStringCodec<AffinityRulesSet>(AffinityRulesSet.class));
 
+    /**
+     * Comma separated list of jar file dependencies to be deployed with the 
application.
+     * The launcher will combine the list with built-in dependencies and those 
specified
+     * that are made available through the distributed file system to 
application master
+     * and child containers.
+     */
+    Attribute<String> LIBRARY_JARS = new Attribute<>(new 
StringCodec.String2String());
+
     @SuppressWarnings(value = "FieldNameHidesFieldInSuperclass")
     long serialVersionUID = 
AttributeMap.AttributeInitializer.initialize(DAGContext.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java 
b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
index 8a8db79..e9dd72b 100644
--- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
+++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
@@ -168,7 +168,7 @@ public class LaunchContainerRunnable implements Runnable
     try {
       // child VM dependencies
       try (FileSystem fs = 
StramClientUtils.newFileSystemInstance(nmClient.getConfig())) {
-        addFilesToLocalResources(LocalResourceType.FILE, 
dag.getAttributes().get(LogicalPlan.LIBRARY_JARS), localResources, fs);
+        addFilesToLocalResources(LocalResourceType.FILE, 
dag.getAttributes().get(Context.DAGContext.LIBRARY_JARS), localResources, fs);
         String archives = dag.getAttributes().get(LogicalPlan.ARCHIVES);
         if (archives != null) {
           addFilesToLocalResources(LocalResourceType.ARCHIVE, archives, 
localResources, fs);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java 
b/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java
index e51af6b..3fedc7c 100644
--- a/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java
+++ b/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java
@@ -18,8 +18,14 @@
  */
 package com.datatorrent.stram;
 
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
 import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.StreamingApplication;
@@ -66,9 +72,32 @@ public class LocalModeImpl extends LocalMode
   public Controller getController()
   {
     try {
+      addLibraryJarsToClasspath(lp);
       return new StramLocalCluster(lp);
     } catch (Exception e) {
       throw new RuntimeException("Error creating local cluster", e);
     }
   }
+
+  private void addLibraryJarsToClasspath(LogicalPlan lp) throws 
MalformedURLException
+  {
+    String libJarsCsv = 
lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS);
+
+    if (libJarsCsv != null && libJarsCsv.length() != 0) {
+      String[] split = libJarsCsv.split(StramClient.LIB_JARS_SEP);
+      if (split.length != 0) {
+        URL[] urlList = new URL[split.length];
+        for (int i = 0; i < split.length; i++) {
+          File file = new File(split[i]);
+          urlList[i] = file.toURI().toURL();
+        }
+
+        // Set class loader.
+        ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
+        URLClassLoader cl = URLClassLoader.newInstance(urlList, prevCl);
+        Thread.currentThread().setContextClassLoader(cl);
+      }
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java 
b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index c1dfffd..daca67e 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -74,6 +74,7 @@ import org.apache.log4j.DTLoggerFactory;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.StorageAgent;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
@@ -223,7 +224,7 @@ public class StramClient
       localJarFiles.add(jar);
     }
 
-    String libJarsPath = dag.getValue(LogicalPlan.LIBRARY_JARS);
+    String libJarsPath = dag.getValue(Context.DAGContext.LIBRARY_JARS);
     if (!StringUtils.isEmpty(libJarsPath)) {
       String[] libJars = StringUtils.splitByWholeSeparator(libJarsPath, 
LIB_JARS_SEP);
       localJarFiles.addAll(Arrays.asList(libJars));
@@ -442,7 +443,7 @@ public class StramClient
       String libJarsCsv = copyFromLocal(fs, appPath, localJarFiles.toArray(new 
String[]{}));
 
       LOG.info("libjars: {}", libJarsCsv);
-      dag.getAttributes().put(LogicalPlan.LIBRARY_JARS, libJarsCsv);
+      dag.getAttributes().put(Context.DAGContext.LIBRARY_JARS, libJarsCsv);
       LaunchContainerRunnable.addFilesToLocalResources(LocalResourceType.FILE, 
libJarsCsv, localResources, fs);
 
       if (archives != null) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 6a8ee9c..37f63b2 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -3131,7 +3131,7 @@ public class StreamingContainerManager implements 
PlanContext
 
       lp.setAttribute(LogicalPlan.APPLICATION_ID, appId);
       lp.setAttribute(LogicalPlan.APPLICATION_PATH, newApp.assertAppPath());
-      lp.setAttribute(LogicalPlan.LIBRARY_JARS, 
newApp.getValue(LogicalPlan.LIBRARY_JARS));
+      lp.setAttribute(Context.DAGContext.LIBRARY_JARS, 
newApp.getValue(Context.DAGContext.LIBRARY_JARS));
       lp.setAttribute(LogicalPlan.ARCHIVES, 
newApp.getValue(LogicalPlan.ARCHIVES));
 
       this.finals = new FinalVars(finals, lp);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java 
b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index a1197a5..bd58e35 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -285,7 +285,7 @@ public class StramAppLauncher
     // don't rely on object deserialization for changing the app id in the 
future.
     try {
       JSONObject attributes = metaJson.getJSONObject("attributes");
-      originalLibJars = 
attributes.getString(LogicalPlan.LIBRARY_JARS.getSimpleName());
+      originalLibJars = 
attributes.getString(Context.DAGContext.LIBRARY_JARS.getSimpleName());
       recoveryAppName = 
attributes.getString(Context.DAGContext.APPLICATION_NAME.getSimpleName());
     } catch (JSONException ex) {
       recoveryAppName = "Recovery App From " + originalAppId;
@@ -532,10 +532,22 @@ public class StramAppLauncher
    */
   public void runLocal(AppFactory appConfig) throws Exception
   {
+    propertiesBuilder.conf.setEnum(StreamingApplication.ENVIRONMENT, 
StreamingApplication.Environment.LOCAL);
+    LogicalPlan lp = appConfig.createApp(propertiesBuilder);
+
+    String libJarsCsv = 
lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS);
+    if (libJarsCsv != null && libJarsCsv.length() != 0) {
+      String[] split = libJarsCsv.split(StramClient.LIB_JARS_SEP);
+      for (String jarPath : split) {
+        File file = new File(jarPath);
+        URL url = file.toURI().toURL();
+        launchDependencies.add(url);
+      }
+    }
+
     // local mode requires custom classes to be resolved through the context 
class loader
     loadDependencies();
-    propertiesBuilder.conf.setEnum(StreamingApplication.ENVIRONMENT, 
StreamingApplication.Environment.LOCAL);
-    StramLocalCluster lc = new 
StramLocalCluster(appConfig.createApp(propertiesBuilder));
+    StramLocalCluster lc = new StramLocalCluster(lp);
     lc.run();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java 
b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index e3f6987..15969b7 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -61,6 +61,8 @@ import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.input.ClassLoaderObjectInputStream;
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 import org.apache.hadoop.conf.Configuration;
@@ -158,13 +160,6 @@ public class LogicalPlan implements Serializable, DAG
   public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null, 
new StringCodec.String2String());
   public static Attribute<Double> TOKEN_REFRESH_ANTICIPATORY_FACTOR = new 
Attribute<>(0.7);
   /**
-   * Comma separated list of jar file dependencies to be deployed with the 
application.
-   * The launcher will combine the list with built-in dependencies and those 
specified
-   * that are made available through the distributed file system to 
application master
-   * and child containers.
-   */
-  public static Attribute<String> LIBRARY_JARS = new Attribute<>(new 
StringCodec.String2String());
-  /**
    * Comma separated list of archives to be deployed with the application.
    * The launcher will include the archives into the final set of resources
    * that are made available through the distributed file system to 
application master
@@ -2386,7 +2381,7 @@ public class LogicalPlan implements Serializable, DAG
 
   public static LogicalPlan read(InputStream is) throws IOException, 
ClassNotFoundException
   {
-    return (LogicalPlan)new ObjectInputStream(is).readObject();
+    return (LogicalPlan)new 
ClassLoaderObjectInputStream(Thread.currentThread().getContextClassLoader(), 
is).readObject();
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java 
b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
index e620141..1a5046c 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java
@@ -22,8 +22,11 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.LineNumberReader;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -33,7 +36,16 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer;
 import com.datatorrent.stram.StramLocalCluster.MockComponentFactory;
 import com.datatorrent.stram.api.Checkpoint;
@@ -48,6 +60,7 @@ import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.support.ManualScheduledExecutorService;
 import com.datatorrent.stram.support.StramTestSupport;
 
+
 public class StramLocalClusterTest
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(StramLocalClusterTest.class);
@@ -274,4 +287,113 @@ public class StramLocalClusterTest
     localCluster.shutdown();
   }
 
+  @Test
+  public void testDynamicLoading() throws Exception
+  {
+    String generatedJar = generatejar("POJO");
+    URLClassLoader uCl = URLClassLoader.newInstance(new URL[] {new 
File(generatedJar).toURI().toURL()});
+    Class<?> pojo = uCl.loadClass("POJO");
+
+    DynamicLoaderApp app = new DynamicLoaderApp();
+    app.generatedJar = generatedJar;
+    app.pojo = pojo;
+
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(app, new Configuration());
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    DynamicLoaderApp.latch.await();
+    Assert.assertTrue(DynamicLoaderApp.passed);
+    lc.shutdown();
+  }
+
+  static class DynamicLoaderApp implements StreamingApplication
+  {
+    static boolean passed = false;
+    static CountDownLatch latch = new CountDownLatch(2);
+
+    DynamicLoader test;
+    String generatedJar;
+    Class<?> pojo;
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      TestGeneratorInputOperator input = dag.addOperator("Input", new 
TestGeneratorInputOperator());
+      test = dag.addOperator("Test", new DynamicLoader());
+
+      dag.addStream("S1", input.outport, test.input);
+      dag.setAttribute(Context.DAGContext.LIBRARY_JARS, generatedJar);
+      dag.setInputPortAttribute(test.input, Context.PortContext.TUPLE_CLASS, 
pojo);
+    }
+  }
+
+  static class DynamicLoader extends BaseOperator
+  {
+    public final transient DefaultInputPort input = new DefaultInputPort()
+    {
+      @Override
+      public void setup(Context.PortContext context)
+      {
+        Class<?> value = context.getValue(Context.PortContext.TUPLE_CLASS);
+        if (value.getName().equals("POJO")) {
+          DynamicLoaderApp.passed = true;
+        } else {
+          DynamicLoaderApp.passed = false;
+        }
+        DynamicLoaderApp.latch.countDown();
+      }
+
+      @Override
+      public void process(Object tuple)
+      {
+      }
+    };
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      ClassLoader cl = Thread.currentThread().getContextClassLoader();
+      try {
+        cl.loadClass("POJO");
+      } catch (ClassNotFoundException e) {
+        DynamicLoaderApp.passed = false;
+        DynamicLoaderApp.latch.countDown();
+        throw new RuntimeException(e);
+      }
+
+      try {
+        Class.forName("POJO", true, 
Thread.currentThread().getContextClassLoader());
+      } catch (ClassNotFoundException e) {
+        DynamicLoaderApp.passed = false;
+        DynamicLoaderApp.latch.countDown();
+        throw new RuntimeException(e);
+      }
+
+      DynamicLoaderApp.passed = true;
+      DynamicLoaderApp.latch.countDown();
+    }
+  }
+
+  private String generatejar(String pojoClassName) throws IOException, 
InterruptedException
+  {
+    String sourceDir = "src/test/resources/dynamicJar/";
+    String destDir = testMeta.getPath();
+
+    Process p = Runtime.getRuntime()
+        .exec(new String[] {"javac", "-d", destDir, sourceDir + pojoClassName 
+ ".java"}, null, null);
+    IOUtils.copy(p.getInputStream(), System.out);
+    IOUtils.copy(p.getErrorStream(), System.err);
+    Assert.assertEquals(0, p.waitFor());
+
+    p = Runtime.getRuntime()
+        .exec(new String[] {"jar", "-cf", pojoClassName + ".jar", 
pojoClassName + ".class"}, null, new File(destDir));
+    IOUtils.copy(p.getInputStream(), System.out);
+    IOUtils.copy(p.getErrorStream(), System.err);
+    Assert.assertEquals(0, p.waitFor());
+
+    return new File(destDir, pojoClassName + ".jar").getAbsolutePath();
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f71bb0ed/engine/src/test/resources/dynamicJar/POJO.java
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/dynamicJar/POJO.java 
b/engine/src/test/resources/dynamicJar/POJO.java
new file mode 100644
index 0000000..8868544
--- /dev/null
+++ b/engine/src/test/resources/dynamicJar/POJO.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This file is not directly used anywhere. This will be compiled and packages 
as jar by unit test to verify
+ * dynamic loading.
+ */
+public class POJO
+{
+  private int a;
+  private String b;
+
+  public int getA()
+  {
+    return a;
+  }
+
+  public void setA(int a)
+  {
+    this.a = a;
+  }
+
+  public String getB()
+  {
+    return b;
+  }
+
+  public void setB(String b)
+  {
+    this.b = b;
+  }
+}
+

Reply via email to