IGNITE-4426: Hadoop: tasks can share the same classloader. This closes #1344.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30b869dd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30b869dd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30b869dd Branch: refs/heads/master Commit: 30b869ddd32db637ee9ea8f13a115dd4bacc52fe Parents: b44baf1 Author: devozerov <[email protected]> Authored: Wed Dec 14 14:35:29 2016 +0300 Committer: devozerov <[email protected]> Committed: Thu Dec 15 13:46:34 2016 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopClassLoader.java | 10 +++++++++ .../processors/hadoop/HadoopJobProperty.java | 8 +++++++ .../processors/hadoop/impl/v2/HadoopV2Job.java | 22 ++++++++++++++++++-- 3 files changed, 38 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/30b869dd/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index cd94c89..f6c2fa9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -101,6 +101,16 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache { } /** + * Classloader name for job. + * + * @param jobId Job ID. + * @return Name. + */ + public static String nameForJob(HadoopJobId jobId) { + return "hadoop-job-" + jobId; + } + + /** * Gets name for the task class loader. Task class loader * @param info The task info. * @param prefix Get only prefix (without task type and number) http://git-wip-us.apache.org/repos/asf/ignite/blob/30b869dd/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java index 1f0ef1b..9e1dede 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java @@ -57,6 +57,13 @@ public enum HadoopJobProperty { JOB_STATUS_POLL_DELAY("ignite.job.status.poll.delay"), /** + * Whether job classloader can be shared between all tasks. + * <p> + * Defaults to {@code true}. + */ + JOB_SHARED_CLASSLOADER("ignite.job.shared.classloader"), + + /** * Size in bytes of single memory page which will be allocated for data structures in shuffle. * <p> * By default is {@code 32 * 1024}. @@ -105,6 +112,7 @@ public enum HadoopJobProperty { */ SHUFFLE_JOB_THROTTLE("ignite.shuffle.job.throttle"); + /** Property name. */ private final String propName; http://git-wip-us.apache.org/repos/asf/ignite/blob/30b869dd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java index 36da410..a24e581 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; @@ -73,6 +74,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.JOB_SHARED_CLASSLOADER; import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir; import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir; import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException; @@ -121,6 +123,9 @@ public class HadoopV2Job implements HadoopJob { /** File system cache map. */ private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap(); + /** Shared class loader. */ + private volatile HadoopClassLoader sharedClsLdr; + /** Local node ID */ private volatile UUID locNodeId; @@ -261,8 +266,8 @@ public class HadoopV2Job implements HadoopJob { // If there is no pooled class, then load new one. // Note that the classloader identified by the task it was initially created for, // but later it may be reused for other tasks. - HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), - HadoopClassLoader.nameForTask(info, false), libNames, helper); + HadoopClassLoader ldr = sharedClsLdr != null ? + sharedClsLdr : createClassLoader(HadoopClassLoader.nameForTask(info, false)); cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); @@ -312,6 +317,9 @@ public class HadoopV2Job implements HadoopJob { try { rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId)); + + if (HadoopJobProperty.get(jobInfo, JOB_SHARED_CLASSLOADER, true)) + sharedClsLdr = createClassLoader(HadoopClassLoader.nameForJob(jobId)); } finally { HadoopCommonUtils.restoreContextClassLoader(oldLdr); @@ -454,4 +462,14 @@ public class HadoopV2Job implements HadoopJob { public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException { return fileSystemForMrUserWithCaching(uri, cfg, fsMap); } + + /** + * Create class loader with the given name. + * + * @param name Name. + * @return Class loader. + */ + private HadoopClassLoader createClassLoader(String name) { + return new HadoopClassLoader(rsrcMgr.classPath(), name, libNames, helper); + } } \ No newline at end of file
