IGNITE-3341: Hadoop: Added ability to link user-define native libraries to HadoopClassLoader.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/68b25df3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/68b25df3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/68b25df3 Branch: refs/heads/ignite-1232 Commit: 68b25df3b3e1979c0972d75675f2c9aa2b48d9a7 Parents: a527bf8 Author: vozerov-gridgain <[email protected]> Authored: Tue Jun 21 16:57:28 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Jun 21 17:11:22 2016 +0300 ---------------------------------------------------------------------- .../configuration/HadoopConfiguration.java | 35 ++++++++++++++++++++ .../processors/hadoop/HadoopJobInfo.java | 3 +- .../processors/hadoop/HadoopClassLoader.java | 26 +++++++++++++-- .../processors/hadoop/HadoopDefaultJobInfo.java | 8 ++--- .../hadoop/jobtracker/HadoopJobTracker.java | 14 ++++++-- .../child/HadoopChildProcessRunner.java | 2 +- .../processors/hadoop/v2/HadoopV2Job.java | 14 ++++++-- .../hadoop/HadoopClassLoaderTest.java | 2 +- .../processors/hadoop/HadoopSnappyTest.java | 2 +- .../processors/hadoop/HadoopTasksV1Test.java | 2 +- .../processors/hadoop/HadoopTasksV2Test.java | 2 +- .../processors/hadoop/HadoopV2JobSelfTest.java | 2 +- .../collections/HadoopAbstractMapTest.java | 6 ++-- 13 files changed, 96 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java index 95ce9d3..5f68c84 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java @@ -17,8 +17,10 @@ package org.apache.ignite.configuration; +import org.apache.ignite.lifecycle.LifecycleBean; import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; /** * Ignite Hadoop Accelerator configuration. @@ -51,6 +53,9 @@ public class HadoopConfiguration { /** */ private int maxTaskQueueSize = DFLT_MAX_TASK_QUEUE_SIZE; + /** Library names. */ + private String[] libNames; + /** * Default constructor. */ @@ -71,6 +76,7 @@ public class HadoopConfiguration { planner = cfg.getMapReducePlanner(); maxParallelTasks = cfg.getMaxParallelTasks(); maxTaskQueueSize = cfg.getMaxTaskQueueSize(); + libNames = cfg.getNativeLibraryNames(); } /** @@ -169,6 +175,35 @@ public class HadoopConfiguration { this.planner = planner; } + /** + * Get native library names. + * <p> + * Ignite Hadoop Accelerator executes all Hadoop jobs and tasks in the same process, isolating them with help + * of classloaders. If Hadoop job or task loads a native library, it might lead to exception, because Java do + * not allow to load the same library multiple times from different classloaders. To overcome the problem, + * you should to the following: + * <ul> + * <li>Load necessary libraries in advance from base classloader; {@link LifecycleBean} is a good candidate + * for this;</li> + * <li>Add names of loaded libraries to this property, so that Hadoop engine is able to link them;</li> + * <li>Remove {@link System#load(String)} and {@link System#loadLibrary(String)} calls from your job/task.</li> * + * </ul> + * + * @return Native library names. + */ + @Nullable public String[] getNativeLibraryNames() { + return libNames; + } + + /** + * Set native library names. See {@link #getNativeLibraryNames()} for more information. + * + * @param libNames Native library names. + */ + public void setNativeLibraryNames(@Nullable String... libNames) { + this.libNames = libNames; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(HadoopConfiguration.class, this, super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java index eda8e97..a3b1bb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java @@ -58,11 +58,12 @@ public interface HadoopJobInfo extends Serializable { * @param jobCls The job class. * @param jobId Job ID. * @param log Logger. + * @param libNames Optional additional native library names. * @return Job. * @throws IgniteCheckedException If failed. */ public HadoopJob createJob(Class<? extends HadoopJob> jobCls, - HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; + HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames) throws IgniteCheckedException; /** * @return Number of reducers configured for job. http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index 4448b2d..340b35b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -99,6 +99,9 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache { @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"}) private final String name; + /** Native library names. */ + private final String[] libNames; + /** * Gets name for Job class loader. The name is specific for local node id. * @param locNodeId The local node id. @@ -122,14 +125,19 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache { } /** + * Constructor. + * * @param urls Urls. + * @param name Classloader name. + * @param libNames Optional additional native library names to be linked from parent classloader. */ - public HadoopClassLoader(URL[] urls, String name) { + public HadoopClassLoader(URL[] urls, String name, @Nullable String[] libNames) { super(addHadoopUrls(urls), APP_CLS_LDR); assert !(getParent() instanceof HadoopClassLoader); this.name = name; + this.libNames = libNames; initializeNativeLibraries(); } @@ -159,9 +167,21 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache { Vector vector = U.field(ldr, "nativeLibraries"); for (Object lib : vector) { - String libName = U.field(lib, "name"); + String name = U.field(lib, "name"); + + boolean add = name.contains(LIBHADOOP); + + if (!add && libNames != null) { + for (String libName : libNames) { + if (libName != null && name.contains(libName)) { + add = true; + + break; + } + } + } - if (libName.contains(LIBHADOOP)) { + if (add) { curVector.add(lib); return; http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index fe125fe..be2d9ca 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -82,15 +82,15 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { } /** {@inheritDoc} */ - @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, - HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, + @Nullable String[] libNames) throws IgniteCheckedException { assert jobCls != null; try { Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class, - HadoopDefaultJobInfo.class, IgniteLogger.class); + HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class); - return constructor.newInstance(jobId, this, log); + return constructor.newInstance(jobId, this, log, libNames); } // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call. catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index cdd8103..f3e17f3 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -157,7 +157,12 @@ public class HadoopJobTracker extends HadoopComponent { assert jobCls == null; - HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(nodeId)); + String[] libNames = null; + + if (ctx.configuration() != null) + libNames = ctx.configuration().getNativeLibraryNames(); + + HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(nodeId), libNames); try { jobCls = (Class<HadoopV2Job>)ldr.loadClass(HadoopV2Job.class.getName()); @@ -727,6 +732,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param jobId Job ID. * @param plan Map-reduce plan. */ + @SuppressWarnings({"unused", "ConstantConditions" }) private void printPlan(HadoopJobId jobId, HadoopMapReducePlan plan) { log.info("Plan for " + jobId); @@ -886,6 +892,8 @@ public class HadoopJobTracker extends HadoopComponent { finishFut.onDone(jobId, meta.failCause()); } + assert job != null; + if (ctx.jobUpdateLeader()) job.cleanupStagingDirectory(); @@ -1052,7 +1060,7 @@ public class HadoopJobTracker extends HadoopComponent { jobInfo = meta.jobInfo(); } - job = jobInfo.createJob(jobCls, jobId, log); + job = jobInfo.createJob(jobCls, jobId, log, ctx.configuration().getNativeLibraryNames()); job.initialize(false, ctx.localNodeId()); @@ -1667,7 +1675,7 @@ public class HadoopJobTracker extends HadoopComponent { if (val != null) e.setValue(val); else - e.remove();; + e.remove(); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index a949141..4a946e9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -134,7 +134,7 @@ public class HadoopChildProcessRunner { assert job == null; - job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log); + job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log, null); job.initialize(true, nodeDesc.processId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index b69447d..8804e29 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -93,6 +93,9 @@ public class HadoopV2Job implements HadoopJob { /** Job info. */ protected final HadoopJobInfo jobInfo; + /** Native library names. */ + private final String[] libNames; + /** */ private final JobID hadoopJobID; @@ -119,16 +122,21 @@ public class HadoopV2Job implements HadoopJob { private volatile byte[] jobConfData; /** + * Constructor. + * * @param jobId Job ID. * @param jobInfo Job info. * @param log Logger. + * @param libNames Optional additional native library names. */ - public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log) { + public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log, + @Nullable String[] libNames) { assert jobId != null; assert jobInfo != null; this.jobId = jobId; this.jobInfo = jobInfo; + this.libNames = libNames; hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); @@ -220,7 +228,7 @@ public class HadoopV2Job implements HadoopJob { } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "MismatchedQueryAndUpdateOfCollection" }) @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber()); @@ -242,7 +250,7 @@ public class HadoopV2Job implements HadoopJob { // Note that the classloader identified by the task it was initially created for, // but later it may be reused for other tasks. HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), - HadoopClassLoader.nameForTask(info, false)); + HadoopClassLoader.nameForTask(info, false), libNames); cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java index 55fac2c..02d98d0 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java @@ -49,7 +49,7 @@ import org.apache.ignite.internal.processors.hadoop.deps.Without; */ public class HadoopClassLoaderTest extends TestCase { /** */ - final HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); + final HadoopClassLoader ldr = new HadoopClassLoader(null, "test", null); /** * @throws Exception If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java index 014ff1e..b4e3dc2 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java @@ -49,7 +49,7 @@ public class HadoopSnappyTest extends GridCommonAbstractTest { // Run the same in several more class loaders simulating jobs and tasks: for (int i = 0; i < 2; i++) { - ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i); + ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i, null); Class<?> cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr); http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java index 6ba9686..27d7fc2 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -48,7 +48,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(uuid, 0); - return jobInfo.createJob(HadoopV2Job.class, jobId, log); + return jobInfo.createJob(HadoopV2Job.class, jobId, log, null); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index d125deb..30cf50c 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -67,7 +67,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(uuid, 0); - return jobInfo.createJob(HadoopV2Job.class, jobId, log); + return jobInfo.createJob(HadoopV2Job.class, jobId, log, null); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index 1e9ffbc..ae2c00d 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -78,7 +78,7 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { HadoopJobId id = new HadoopJobId(uuid, 1); - HadoopJob job = info.createJob(HadoopV2Job.class, id, log); + HadoopJob job = info.createJob(HadoopV2Job.class, id, log, null); HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); http://git-wip-us.apache.org/repos/asf/ignite/blob/68b25df3/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index 493098f..5266875 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -77,11 +77,13 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public Comparator<Object> sortComparator() { return ComparableComparator.getInstance(); } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public Comparator<Object> groupComparator() { return ComparableComparator.getInstance(); } @@ -141,8 +143,8 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, - HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, + @Nullable String[] libNames) throws IgniteCheckedException { assert false; return null;
