Repository: ignite Updated Branches: refs/heads/ignite-3914 [created] c2867a75a
IGNITE-3914: refactored HadoopClassLoader : extracted hadoop-specific logic into separate processor. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/64b00945 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/64b00945 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/64b00945 Branch: refs/heads/ignite-3914 Commit: 64b0094533f7ea85d1fa37794165a94b4ef27d2d Parents: 2fe0272 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Fri Sep 16 16:52:17 2016 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Fri Sep 16 16:52:17 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 8 + .../ignite/internal/GridKernalContextImpl.java | 12 + .../ignite/internal/IgniteComponentType.java | 8 + .../apache/ignite/internal/IgniteKernal.java | 3 + .../processors/hadoop/HadoopHelper.java | 66 ++ .../processors/hadoop/HadoopJobInfo.java | 4 +- .../processors/hadoop/HadoopNoopHelper.java | 40 ++ .../processors/hadoop/HadoopClassLoader.java | 16 +- .../hadoop/HadoopClassLoaderUtils.java | 684 ------------------ .../processors/hadoop/HadoopDefaultJobInfo.java | 6 +- .../processors/hadoop/HadoopHelperImpl.java | 699 +++++++++++++++++++ .../hadoop/jobtracker/HadoopJobTracker.java | 6 +- .../child/HadoopChildProcessRunner.java | 9 +- .../processors/hadoop/v2/HadoopV2Job.java | 9 +- .../hadoop/HadoopClassLoaderTest.java | 2 +- .../processors/hadoop/HadoopPlannerMockJob.java | 2 +- .../processors/hadoop/HadoopSnappyTest.java | 2 +- .../processors/hadoop/HadoopTasksV1Test.java | 2 +- .../processors/hadoop/HadoopTasksV2Test.java | 2 +- .../processors/hadoop/HadoopV2JobSelfTest.java | 2 +- .../collections/HadoopAbstractMapTest.java | 3 +- 21 files changed, 879 insertions(+), 706 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 3eaef1e..b123a4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; import org.apache.ignite.internal.processors.igfs.IgfsHelper; import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; @@ -285,6 +286,13 @@ public interface GridKernalContext extends Iterable<GridComponent> { public HadoopProcessorAdapter hadoop(); /** + * Gets Hadoop helper. + * + * @return Hadoop helper. + */ + public HadoopHelper hadoopHelper(); + + /** * Gets utility cache pool. * * @return Utility cache pool. http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 1ff4543..eb214e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; import org.apache.ignite.internal.processors.igfs.IgfsHelper; import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; @@ -238,6 +239,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringInclude + private HadoopHelper hadoopHelper; + + /** */ + @GridToStringInclude private GridSegmentationProcessor segProc; /** */ @@ -541,6 +546,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable if (helper instanceof IgfsHelper) igfsHelper = (IgfsHelper)helper; + else if (helper instanceof HadoopHelper) + hadoopHelper = (HadoopHelper)helper; else assert false : "Unknown helper class: " + helper.getClass(); } @@ -733,6 +740,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public HadoopHelper hadoopHelper() { + return hadoopHelper; + } + + /** {@inheritDoc} */ @Override public GridContinuousProcessor continuous() { return contProc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java index 76e495f..2e35dc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal; import java.lang.reflect.Constructor; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopNoopHelper; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.jetbrains.annotations.Nullable; @@ -41,6 +42,13 @@ public enum IgniteComponentType { "ignite-hadoop" ), + /** Hadoop Helper component. */ + HADOOP_HELPER( + HadoopNoopHelper.class.getName(), + "org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl", + "ignite-hadoop" + ), + /** IGFS helper component. */ IGFS_HELPER( "org.apache.ignite.internal.processors.igfs.IgfsNoopHelper", http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index b85692e..49c994a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -188,6 +188,7 @@ import static org.apache.ignite.internal.GridKernalState.STARTED; import static org.apache.ignite.internal.GridKernalState.STARTING; import static org.apache.ignite.internal.GridKernalState.STOPPED; import static org.apache.ignite.internal.GridKernalState.STOPPING; +import static org.apache.ignite.internal.IgniteComponentType.HADOOP_HELPER; import static org.apache.ignite.internal.IgniteComponentType.IGFS; import static org.apache.ignite.internal.IgniteComponentType.IGFS_HELPER; import static org.apache.ignite.internal.IgniteComponentType.SCHEDULE; @@ -821,6 +822,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { addHelper(IGFS_HELPER.create(F.isEmpty(cfg.getFileSystemConfiguration()))); + addHelper(HADOOP_HELPER.createIfInClassPath(ctx, false)); + startProcessor(new IgnitePluginProcessor(ctx, cfg, plugins)); // Off-heap processor has no dependencies. http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java new file mode 100644 index 0000000..4ef3ff0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.InputStream; +import org.jetbrains.annotations.Nullable; + +/** + * IGFS utility processor adapter. + */ +public interface HadoopHelper { + /** + * Load special replacement and impersonate + * + * @param in Input stream. + * @param originalName Original class name. + * @param replaceName Replacer class name. + * @return Result. + */ + public abstract byte[] loadReplace(InputStream in, final String originalName, final String replaceName); + + /** + * @param cls Class name. + * @return {@code true} If this is Hadoop class. + */ + public abstract boolean isHadoop(String cls); + + /** + * Need to parse only Ignite Hadoop and IGFS classes. + * + * @param cls Class name. + * @return {@code true} if we need to check this class. + */ + public abstract boolean isHadoopIgfs(String cls); + + /** + * @param ldr Loader. + * @param clsName Class. + * @return Input stream. + */ + @Nullable public abstract InputStream loadClassBytes(ClassLoader ldr, String clsName); + + /** + * Check whether class has external dependencies on Hadoop. + * + * @param clsName Class name. + * @param parentClsLdr Parent class loader. + * @return {@code True} if class has external dependencies. + */ + public abstract boolean hasExternalDependencies(String clsName, ClassLoader parentClsLdr); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java index a3b1bb6..f358c7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java @@ -59,11 +59,13 @@ public interface HadoopJobInfo extends Serializable { * @param jobId Job ID. * @param log Logger. * @param libNames Optional additional native library names. + * @param hadoopHelper HadoopHelper. * @return Job. * @throws IgniteCheckedException If failed. */ public HadoopJob createJob(Class<? extends HadoopJob> jobCls, - HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames) throws IgniteCheckedException; + HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames, HadoopHelper hadoopHelper) + throws IgniteCheckedException; /** * @return Number of reducers configured for job. http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java new file mode 100644 index 0000000..c09668e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java @@ -0,0 +1,40 @@ +package org.apache.ignite.internal.processors.hadoop; + +import java.io.InputStream; +import org.apache.ignite.internal.GridKernalContext; +import org.jetbrains.annotations.Nullable; + +/** + * Noop Hadoop Helper implementation. + */ +public class HadoopNoopHelper implements HadoopHelper { + /** Constructor required by the engine. */ + public HadoopNoopHelper(GridKernalContext ctx) { + // nool + } + + /** {@inheritDoc} */ + @Override public boolean hasExternalDependencies(String clsName, ClassLoader parentClsLdr) { + return false; + } + + /** {@inheritDoc} */ + @Override public byte[] loadReplace(InputStream in, String originalName, String replaceName) { + return new byte[0]; + } + + /** {@inheritDoc} */ + @Override public boolean isHadoop(String cls) { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isHadoopIgfs(String cls) { + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public InputStream loadClassBytes(ClassLoader ldr, String clsName) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index 30a6e72..b0e148b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -89,6 +89,9 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache { /** Native library names. */ private final String[] libNames; + /** Igfs Helper. */ + private final HadoopHelper hadoopHelper; + /** * Gets name for Job class loader. The name is specific for local node id. * @param locNodeId The local node id. @@ -118,13 +121,14 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache { * @param name Classloader name. * @param libNames Optional additional native library names to be linked from parent classloader. */ - public HadoopClassLoader(URL[] urls, String name, @Nullable String[] libNames) { + public HadoopClassLoader(URL[] urls, String name, @Nullable String[] libNames, HadoopHelper hadoopHelper) { super(addHadoopUrls(urls), APP_CLS_LDR); assert !(getParent() instanceof HadoopClassLoader); this.name = name; this.libNames = libNames; + this.hadoopHelper = hadoopHelper; initializeNativeLibraries(); } @@ -190,7 +194,7 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache { @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { try { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath. - if (HadoopClassLoaderUtils.isHadoop(name)) { + if (hadoopHelper.isHadoop(name)) { if (name.equals(CLS_SHUTDOWN_HOOK_MANAGER)) // Dirty hack to get rid of Hadoop shutdown hooks. return loadReplace(name, CLS_SHUTDOWN_HOOK_MANAGER_REPLACE); else if (name.equals(CLS_DAEMON)) @@ -202,7 +206,7 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache { } // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop. - if (HadoopClassLoaderUtils.isHadoopIgfs(name)) { + if (hadoopHelper.isHadoopIgfs(name)) { if (hasExternalDependencies(name)) return loadClassExplicitly(name, resolve); } @@ -232,9 +236,9 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache { byte[] bytes = bytesCache.get(originalName); if (bytes == null) { - InputStream in = HadoopClassLoaderUtils.loadClassBytes(getParent(), replaceName); + InputStream in = hadoopHelper.loadClassBytes(getParent(), replaceName); - bytes = HadoopClassLoaderUtils.loadReplace(in, originalName, replaceName); + bytes = hadoopHelper.loadReplace(in, originalName, replaceName); bytesCache.put(originalName, bytes); } @@ -292,7 +296,7 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache { * @return {@code True} if class has external dependencies. */ boolean hasExternalDependencies(String clsName) { - return HadoopClassLoaderUtils.hasExternalDependencies(clsName, getParent()); + return hadoopHelper.hasExternalDependencies(clsName, getParent()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java deleted file mode 100644 index 3415d6a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java +++ /dev/null @@ -1,684 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; -import org.objectweb.asm.AnnotationVisitor; -import org.objectweb.asm.Attribute; -import org.objectweb.asm.ClassReader; -import org.objectweb.asm.ClassVisitor; -import org.objectweb.asm.ClassWriter; -import org.objectweb.asm.FieldVisitor; -import org.objectweb.asm.Handle; -import org.objectweb.asm.Label; -import org.objectweb.asm.MethodVisitor; -import org.objectweb.asm.Opcodes; -import org.objectweb.asm.Type; -import org.objectweb.asm.commons.Remapper; -import org.objectweb.asm.commons.RemappingClassAdapter; - -import java.io.IOException; -import java.io.InputStream; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Utility methods for Hadoop classloader required to avoid direct 3rd-party dependencies in class loader. - */ -public class HadoopClassLoaderUtils { - /** Cache for resolved dependency info. */ - private static final Map<String, Boolean> dependenciesCache = new ConcurrentHashMap8<>(); - - /** - * Load special replacement and impersonate - * - * @param in Input stream. - * @param originalName Original class name. - * @param replaceName Replacer class name. - * @return Result. - */ - public static byte[] loadReplace(InputStream in, final String originalName, final String replaceName) { - ClassReader rdr; - - try { - rdr = new ClassReader(in); - } - catch (IOException e) { - throw new RuntimeException(e); - } - - ClassWriter w = new ClassWriter(Opcodes.ASM4); - - rdr.accept(new RemappingClassAdapter(w, new Remapper() { - /** */ - String replaceType = replaceName.replace('.', '/'); - - /** */ - String nameType = originalName.replace('.', '/'); - - @Override public String map(String type) { - if (type.equals(replaceType)) - return nameType; - - return type; - } - }), ClassReader.EXPAND_FRAMES); - - return w.toByteArray(); - } - - /** - * @param cls Class name. - * @return {@code true} If this is Hadoop class. - */ - public static boolean isHadoop(String cls) { - return cls.startsWith("org.apache.hadoop."); - } - - /** - * Need to parse only Ignite Hadoop and IGFS classes. - * - * @param cls Class name. - * @return {@code true} if we need to check this class. - */ - public static boolean isHadoopIgfs(String cls) { - String ignitePkgPrefix = "org.apache.ignite"; - - int len = ignitePkgPrefix.length(); - - return cls.startsWith(ignitePkgPrefix) && ( - cls.indexOf("igfs.", len) != -1 || - cls.indexOf(".fs.", len) != -1 || - cls.indexOf("hadoop.", len) != -1); - } - - /** - * @param ldr Loader. - * @param clsName Class. - * @return Input stream. - */ - @Nullable public static InputStream loadClassBytes(ClassLoader ldr, String clsName) { - return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class"); - } - - /** - * Check whether class has external dependencies on Hadoop. - * - * @param clsName Class name. - * @param parentClsLdr Parent class loader. - * @return {@code True} if class has external dependencies. - */ - static boolean hasExternalDependencies(String clsName, ClassLoader parentClsLdr) { - Boolean hasDeps = dependenciesCache.get(clsName); - - if (hasDeps == null) { - CollectingContext ctx = new CollectingContext(parentClsLdr); - - ctx.annVisitor = new CollectingAnnotationVisitor(ctx); - ctx.mthdVisitor = new CollectingMethodVisitor(ctx, ctx.annVisitor); - ctx.fldVisitor = new CollectingFieldVisitor(ctx, ctx.annVisitor); - ctx.clsVisitor = new CollectingClassVisitor(ctx, ctx.annVisitor, ctx.mthdVisitor, ctx.fldVisitor); - - hasDeps = hasExternalDependencies(clsName, parentClsLdr, ctx); - - dependenciesCache.put(clsName, hasDeps); - } - - return hasDeps; - } - - /** - * Check whether class has external dependencies on Hadoop. - * - * @param clsName Class name. - * @param parentClsLdr Parent class loader. - * @param ctx Context. - * @return {@code true} If the class has external dependencies. - */ - static boolean hasExternalDependencies(String clsName, ClassLoader parentClsLdr, CollectingContext ctx) { - if (isHadoop(clsName)) // Hadoop must not be in classpath but Idea sucks, so filtering explicitly as external. - return true; - - // Try to get from parent to check if the type accessible. - InputStream in = loadClassBytes(parentClsLdr, clsName); - - if (in == null) // The class is external itself, it must be loaded from this class loader. - return true; - - if (!isHadoopIgfs(clsName)) // Other classes should not have external dependencies. - return false; - - final ClassReader rdr; - - try { - rdr = new ClassReader(in); - } - catch (IOException e) { - throw new RuntimeException("Failed to read class: " + clsName, e); - } - - ctx.visited.add(clsName); - - rdr.accept(ctx.clsVisitor, 0); - - if (ctx.found) // We already know that we have dependencies, no need to check parent. - return true; - - // Here we are known to not have any dependencies but possibly we have a parent which has them. - int idx = clsName.lastIndexOf('$'); - - if (idx == -1) // No parent class. - return false; - - String parentCls = clsName.substring(0, idx); - - if (ctx.visited.contains(parentCls)) - return false; - - Boolean res = dependenciesCache.get(parentCls); - - if (res == null) - res = hasExternalDependencies(parentCls, parentClsLdr, ctx); - - return res; - } - - /** - * @param name Class name. - * @return {@code true} If this is a valid class name. - */ - private static boolean validateClassName(String name) { - int len = name.length(); - - if (len <= 1) - return false; - - if (!Character.isJavaIdentifierStart(name.charAt(0))) - return false; - - boolean hasDot = false; - - for (int i = 1; i < len; i++) { - char c = name.charAt(i); - - if (c == '.') - hasDot = true; - else if (!Character.isJavaIdentifierPart(c)) - return false; - } - - return hasDot; - } - - /** - * Context for dependencies collection. - */ - private static class CollectingContext { - /** Visited classes. */ - private final Set<String> visited = new HashSet<>(); - - /** Parent class loader. */ - private final ClassLoader parentClsLdr; - - /** Whether dependency found. */ - private boolean found; - - /** Annotation visitor. */ - private AnnotationVisitor annVisitor; - - /** Method visitor. */ - private MethodVisitor mthdVisitor; - - /** Field visitor. */ - private FieldVisitor fldVisitor; - - /** Class visitor. */ - private ClassVisitor clsVisitor; - - /** - * Constrcutor. - * - * @param parentClsLdr Parent class loader. - */ - private CollectingContext(ClassLoader parentClsLdr) { - this.parentClsLdr = parentClsLdr; - } - - /** - * Processes a method descriptor - * @param methDesc The method desc String. - */ - void onMethodsDesc(final String methDesc) { - // Process method return type: - onType(Type.getReturnType(methDesc)); - - if (found) - return; - - // Process method argument types: - for (Type t: Type.getArgumentTypes(methDesc)) { - onType(t); - - if (found) - return; - } - } - - /** - * Processes dependencies of a class. - * - * @param depCls The class name as dot-notated FQN. - */ - void onClass(final String depCls) { - assert depCls.indexOf('/') == -1 : depCls; // class name should be fully converted to dot notation. - assert depCls.charAt(0) != 'L' : depCls; - assert validateClassName(depCls) : depCls; - - if (depCls.startsWith("java.") || depCls.startsWith("javax.")) // Filter out platform classes. - return; - - if (visited.contains(depCls)) - return; - - Boolean res = dependenciesCache.get(depCls); - - if (res == Boolean.TRUE || (res == null && hasExternalDependencies(depCls, parentClsLdr, this))) - found = true; - } - - /** - * Analyses dependencies of given type. - * - * @param t The type to process. - */ - void onType(Type t) { - if (t == null) - return; - - int sort = t.getSort(); - - switch (sort) { - case Type.ARRAY: - onType(t.getElementType()); - - break; - - case Type.OBJECT: - onClass(t.getClassName()); - - break; - } - } - - /** - * Analyses dependencies of given object type. - * - * @param objType The object type to process. - */ - void onInternalTypeName(String objType) { - if (objType == null) - return; - - assert objType.length() > 1 : objType; - - if (objType.charAt(0) == '[') - // handle array. In this case this is a type descriptor notation, like "[Ljava/lang/Object;" - onType(objType); - else { - assert objType.indexOf('.') == -1 : objType; // Must be slash-separated FQN. - - String clsName = objType.replace('/', '.'); // Convert it to dot notation. - - onClass(clsName); // Process. - } - } - - /** - * Type description analyser. - * - * @param desc The description. - */ - void onType(String desc) { - if (!F.isEmpty(desc)) { - if (desc.length() <= 1) - return; // Optimization: filter out primitive types in early stage. - - Type t = Type.getType(desc); - - onType(t); - } - } - } - - /** - * Annotation visitor. - */ - private static class CollectingAnnotationVisitor extends AnnotationVisitor { - /** */ - final CollectingContext ctx; - - /** - * Annotation visitor. - * - * @param ctx The collector. - */ - CollectingAnnotationVisitor(CollectingContext ctx) { - super(Opcodes.ASM4); - - this.ctx = ctx; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String name, String desc) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return this; - } - - /** {@inheritDoc} */ - @Override public void visitEnum(String name, String desc, String val) { - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitArray(String name) { - return ctx.found ? null : this; - } - - /** {@inheritDoc} */ - @Override public void visit(String name, Object val) { - if (ctx.found) - return; - - if (val instanceof Type) - ctx.onType((Type)val); - } - - /** {@inheritDoc} */ - @Override public void visitEnd() { - // No-op. - } - } - - /** - * Field visitor. - */ - private static class CollectingFieldVisitor extends FieldVisitor { - /** Collector. */ - private final CollectingContext ctx; - - /** Annotation visitor. */ - private final AnnotationVisitor av; - - /** - * Constructor. - */ - CollectingFieldVisitor(CollectingContext ctx, AnnotationVisitor av) { - super(Opcodes.ASM4); - - this.ctx = ctx; - this.av = av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public void visitAttribute(Attribute attr) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void visitEnd() { - // No-op. - } - } - - /** - * Class visitor. - */ - private static class CollectingClassVisitor extends ClassVisitor { - /** Collector. */ - private final CollectingContext ctx; - - /** Annotation visitor. */ - private final AnnotationVisitor av; - - /** Method visitor. */ - private final MethodVisitor mv; - - /** Field visitor. */ - private final FieldVisitor fv; - - /** - * Constructor. - * - * @param ctx Collector. - * @param av Annotation visitor. - * @param mv Method visitor. - * @param fv Field visitor. - */ - CollectingClassVisitor(CollectingContext ctx, AnnotationVisitor av, MethodVisitor mv, FieldVisitor fv) { - super(Opcodes.ASM4); - - this.ctx = ctx; - this.av = av; - this.mv = mv; - this.fv = fv; - } - - /** {@inheritDoc} */ - @Override public void visit(int i, int i2, String name, String signature, String superName, String[] ifaces) { - if (ctx.found) - return; - - ctx.onInternalTypeName(superName); - - if (ctx.found) - return; - - if (ifaces != null) { - for (String iface : ifaces) { - ctx.onInternalTypeName(iface); - - if (ctx.found) - return; - } - } - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public void visitInnerClass(String name, String outerName, String innerName, int i) { - if (ctx.found) - return; - - ctx.onInternalTypeName(name); - } - - /** {@inheritDoc} */ - @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : fv; - } - - /** {@inheritDoc} */ - @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature, - String[] exceptions) { - if (ctx.found) - return null; - - ctx.onMethodsDesc(desc); - - // Process declared method exceptions: - if (exceptions != null) { - for (String e : exceptions) - ctx.onInternalTypeName(e); - } - - return ctx.found ? null : mv; - } - } - - /** - * Method visitor. - */ - private static class CollectingMethodVisitor extends MethodVisitor { - /** Collector. */ - private final CollectingContext ctx; - - /** Annotation visitor. */ - private final AnnotationVisitor av; - - /** - * Constructor. - * - * @param ctx Collector. - * @param av Annotation visitor. - */ - private CollectingMethodVisitor(CollectingContext ctx, AnnotationVisitor av) { - super(Opcodes.ASM4); - - this.ctx = ctx; - this.av = av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotationDefault() { - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public void visitFieldInsn(int opcode, String owner, String name, String desc) { - if (ctx.found) - return; - - ctx.onInternalTypeName(owner); - - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public void visitInvokeDynamicInsn(String name, String desc, Handle bsm, Object... bsmArgs) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void visitFrame(int type, int nLoc, Object[] locTypes, int nStack, Object[] stackTypes) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void visitLocalVariable(String name, String desc, String signature, Label lb, - Label lb2, int i) { - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public void visitMethodInsn(int i, String owner, String name, String desc) { - if (ctx.found) - return; - - ctx.onInternalTypeName(owner); - - if (ctx.found) - return; - - ctx.onMethodsDesc(desc); - } - - /** {@inheritDoc} */ - @Override public void visitMultiANewArrayInsn(String desc, int dim) { - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public void visitTryCatchBlock(Label start, Label end, Label hndl, String typeStr) { - if (ctx.found) - return; - - ctx.onInternalTypeName(typeStr); - } - - /** {@inheritDoc} */ - @Override public void visitTypeInsn(int opcode, String type) { - if (ctx.found) - return; - - ctx.onInternalTypeName(type); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index 1382c1f..576a766 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -83,14 +83,14 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { /** {@inheritDoc} */ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, - @Nullable String[] libNames) throws IgniteCheckedException { + @Nullable String[] libNames, HadoopHelper hadoopHelper) throws IgniteCheckedException { assert jobCls != null; try { Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class, - HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class); + HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class, HadoopHelper.class); - return constructor.newInstance(jobId, this, log, libNames); + return constructor.newInstance(jobId, this, log, libNames, hadoopHelper); } catch (Throwable t) { if (t instanceof Error) http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java new file mode 100644 index 0000000..a06d3ba --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java @@ -0,0 +1,699 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; +import org.objectweb.asm.AnnotationVisitor; +import org.objectweb.asm.Attribute; +import org.objectweb.asm.ClassReader; +import org.objectweb.asm.ClassVisitor; +import org.objectweb.asm.ClassWriter; +import org.objectweb.asm.FieldVisitor; +import org.objectweb.asm.Handle; +import org.objectweb.asm.Label; +import org.objectweb.asm.MethodVisitor; +import org.objectweb.asm.Opcodes; +import org.objectweb.asm.Type; +import org.objectweb.asm.commons.Remapper; +import org.objectweb.asm.commons.RemappingClassAdapter; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Utility methods for Hadoop classloader required to avoid direct 3rd-party dependencies in class loader. + */ +public class HadoopHelperImpl implements HadoopHelper { + /** Cache for resolved dependency info. */ + private static final Map<String, Boolean> dependenciesCache = new ConcurrentHashMap8<>(); + + /** */ + public HadoopHelperImpl() { + this(null); + } + + /** Constructor required by the engine. */ + public HadoopHelperImpl(GridKernalContext ctx) { + // nool + } + + /** + * Load special replacement and impersonate + * + * @param in Input stream. + * @param originalName Original class name. + * @param replaceName Replacer class name. + * @return Result. + */ + @Override public byte[] loadReplace(InputStream in, final String originalName, final String replaceName) { + ClassReader rdr; + + try { + rdr = new ClassReader(in); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + ClassWriter w = new ClassWriter(Opcodes.ASM4); + + rdr.accept(new RemappingClassAdapter(w, new Remapper() { + /** */ + String replaceType = replaceName.replace('.', '/'); + + /** */ + String nameType = originalName.replace('.', '/'); + + @Override public String map(String type) { + if (type == null) + return null; + + if (type.equals(replaceType)) + return nameType; + + return type; + } + }), ClassReader.EXPAND_FRAMES); + + return w.toByteArray(); + } + + /** + * @param cls Class name. + * @return {@code true} If this is Hadoop class. + */ + @Override public boolean isHadoop(String cls) { + return cls.startsWith("org.apache.hadoop."); + } + + /** + * Need to parse only Ignite Hadoop and IGFS classes. + * + * @param cls Class name. + * @return {@code true} if we need to check this class. + */ + @Override public boolean isHadoopIgfs(String cls) { + String ignitePkgPrefix = "org.apache.ignite"; + + int len = ignitePkgPrefix.length(); + + return cls.startsWith(ignitePkgPrefix) && ( + cls.indexOf("igfs.", len) != -1 || + cls.indexOf(".fs.", len) != -1 || + cls.indexOf("hadoop.", len) != -1); + } + + /** + * @param ldr Loader. + * @param clsName Class. + * @return Input stream. + */ + @Override @Nullable public InputStream loadClassBytes(ClassLoader ldr, String clsName) { + return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class"); + } + + /** + * Check whether class has external dependencies on Hadoop. + * + * @param clsName Class name. + * @param parentClsLdr Parent class loader. + * @return {@code True} if class has external dependencies. + */ + @Override public boolean hasExternalDependencies(String clsName, ClassLoader parentClsLdr) { + Boolean hasDeps = dependenciesCache.get(clsName); + + if (hasDeps == null) { + CollectingContext ctx = new CollectingContext(parentClsLdr); + + ctx.annVisitor = new CollectingAnnotationVisitor(ctx); + ctx.mthdVisitor = new CollectingMethodVisitor(ctx, ctx.annVisitor); + ctx.fldVisitor = new CollectingFieldVisitor(ctx, ctx.annVisitor); + ctx.clsVisitor = new CollectingClassVisitor(ctx, ctx.annVisitor, ctx.mthdVisitor, ctx.fldVisitor); + + hasDeps = hasExternalDependencies(this, clsName, parentClsLdr, ctx); + + dependenciesCache.put(clsName, hasDeps); + } + + return hasDeps; + } + + /** + * Check whether class has external dependencies on Hadoop. + * + * @param clsName Class name. + * @param parentClsLdr Parent class loader. + * @param ctx Context. + * @return {@code true} If the class has external dependencies. + */ + static boolean hasExternalDependencies(HadoopHelper h, String clsName, ClassLoader parentClsLdr, CollectingContext ctx) { + if (h.isHadoop(clsName)) // Hadoop must not be in classpath but Idea sucks, so filtering explicitly as external. + return true; + + // Try to get from parent to check if the type accessible. + InputStream in = h.loadClassBytes(parentClsLdr, clsName); + + if (in == null) // The class is external itself, it must be loaded from this class loader. + return true; + + if (!h.isHadoopIgfs(clsName)) // Other classes should not have external dependencies. + return false; + + final ClassReader rdr; + + try { + rdr = new ClassReader(in); + } + catch (IOException e) { + throw new RuntimeException("Failed to read class: " + clsName, e); + } + + ctx.visited.add(clsName); + + rdr.accept(ctx.clsVisitor, 0); + + if (ctx.found) // We already know that we have dependencies, no need to check parent. + return true; + + // Here we are known to not have any dependencies but possibly we have a parent which has them. + int idx = clsName.lastIndexOf('$'); + + if (idx == -1) // No parent class. + return false; + + String parentCls = clsName.substring(0, idx); + + if (ctx.visited.contains(parentCls)) + return false; + + Boolean res = dependenciesCache.get(parentCls); + + if (res == null) + res = hasExternalDependencies(h, parentCls, parentClsLdr, ctx); + + return res; + } + + /** + * @param name Class name. + * @return {@code true} If this is a valid class name. + */ + private static boolean validateClassName(String name) { + int len = name.length(); + + if (len <= 1) + return false; + + if (!Character.isJavaIdentifierStart(name.charAt(0))) + return false; + + boolean hasDot = false; + + for (int i = 1; i < len; i++) { + char c = name.charAt(i); + + if (c == '.') + hasDot = true; + else if (!Character.isJavaIdentifierPart(c)) + return false; + } + + return hasDot; + } + + /** + * Context for dependencies collection. + */ + private class CollectingContext { + /** Visited classes. */ + private final Set<String> visited = new HashSet<>(); + + /** Parent class loader. */ + private final ClassLoader parentClsLdr; + + /** Whether dependency found. */ + private boolean found; + + /** Annotation visitor. */ + private AnnotationVisitor annVisitor; + + /** Method visitor. */ + private MethodVisitor mthdVisitor; + + /** Field visitor. */ + private FieldVisitor fldVisitor; + + /** Class visitor. */ + private ClassVisitor clsVisitor; + + /** + * Constrcutor. + * + * @param parentClsLdr Parent class loader. + */ + private CollectingContext(ClassLoader parentClsLdr) { + this.parentClsLdr = parentClsLdr; + } + + /** + * Processes a method descriptor + * @param methDesc The method desc String. + */ + void onMethodsDesc(final String methDesc) { + // Process method return type: + onType(Type.getReturnType(methDesc)); + + if (found) + return; + + // Process method argument types: + for (Type t: Type.getArgumentTypes(methDesc)) { + onType(t); + + if (found) + return; + } + } + + /** + * Processes dependencies of a class. + * + * @param depCls The class name as dot-notated FQN. + */ + void onClass(final String depCls) { + assert depCls.indexOf('/') == -1 : depCls; // class name should be fully converted to dot notation. + assert depCls.charAt(0) != 'L' : depCls; + assert validateClassName(depCls) : depCls; + + if (depCls.startsWith("java.") || depCls.startsWith("javax.")) // Filter out platform classes. + return; + + if (visited.contains(depCls)) + return; + + Boolean res = dependenciesCache.get(depCls); + + if (res == Boolean.TRUE || (res == null && hasExternalDependencies(HadoopHelperImpl.this, depCls, + parentClsLdr, this))) + found = true; + } + + /** + * Analyses dependencies of given type. + * + * @param t The type to process. + */ + void onType(Type t) { + if (t == null) + return; + + int sort = t.getSort(); + + switch (sort) { + case Type.ARRAY: + onType(t.getElementType()); + + break; + + case Type.OBJECT: + onClass(t.getClassName()); + + break; + } + } + + /** + * Analyses dependencies of given object type. + * + * @param objType The object type to process. + */ + void onInternalTypeName(String objType) { + if (objType == null) + return; + + assert objType.length() > 1 : objType; + + if (objType.charAt(0) == '[') + // handle array. In this case this is a type descriptor notation, like "[Ljava/lang/Object;" + onType(objType); + else { + assert objType.indexOf('.') == -1 : objType; // Must be slash-separated FQN. + + String clsName = objType.replace('/', '.'); // Convert it to dot notation. + + onClass(clsName); // Process. + } + } + + /** + * Type description analyser. + * + * @param desc The description. + */ + void onType(String desc) { + if (!F.isEmpty(desc)) { + if (desc.length() <= 1) + return; // Optimization: filter out primitive types in early stage. + + Type t = Type.getType(desc); + + onType(t); + } + } + } + + /** + * Annotation visitor. + */ + private static class CollectingAnnotationVisitor extends AnnotationVisitor { + /** */ + final CollectingContext ctx; + + /** + * Annotation visitor. + * + * @param ctx The collector. + */ + CollectingAnnotationVisitor(CollectingContext ctx) { + super(Opcodes.ASM4); + + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public AnnotationVisitor visitAnnotation(String name, String desc) { + if (ctx.found) + return null; + + ctx.onType(desc); + + return this; + } + + /** {@inheritDoc} */ + @Override public void visitEnum(String name, String desc, String val) { + if (ctx.found) + return; + + ctx.onType(desc); + } + + /** {@inheritDoc} */ + @Override public AnnotationVisitor visitArray(String name) { + return ctx.found ? null : this; + } + + /** {@inheritDoc} */ + @Override public void visit(String name, Object val) { + if (ctx.found) + return; + + if (val instanceof Type) + ctx.onType((Type)val); + } + + /** {@inheritDoc} */ + @Override public void visitEnd() { + // No-op. + } + } + + /** + * Field visitor. + */ + private static class CollectingFieldVisitor extends FieldVisitor { + /** Collector. */ + private final CollectingContext ctx; + + /** Annotation visitor. */ + private final AnnotationVisitor av; + + /** + * Constructor. + */ + CollectingFieldVisitor(CollectingContext ctx, AnnotationVisitor av) { + super(Opcodes.ASM4); + + this.ctx = ctx; + this.av = av; + } + + /** {@inheritDoc} */ + @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { + if (ctx.found) + return null; + + ctx.onType(desc); + + return ctx.found ? null : av; + } + + /** {@inheritDoc} */ + @Override public void visitAttribute(Attribute attr) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void visitEnd() { + // No-op. + } + } + + /** + * Class visitor. + */ + private static class CollectingClassVisitor extends ClassVisitor { + /** Collector. */ + private final CollectingContext ctx; + + /** Annotation visitor. */ + private final AnnotationVisitor av; + + /** Method visitor. */ + private final MethodVisitor mv; + + /** Field visitor. */ + private final FieldVisitor fv; + + /** + * Constructor. + * + * @param ctx Collector. + * @param av Annotation visitor. + * @param mv Method visitor. + * @param fv Field visitor. + */ + CollectingClassVisitor(CollectingContext ctx, AnnotationVisitor av, MethodVisitor mv, FieldVisitor fv) { + super(Opcodes.ASM4); + + this.ctx = ctx; + this.av = av; + this.mv = mv; + this.fv = fv; + } + + /** {@inheritDoc} */ + @Override public void visit(int i, int i2, String name, String signature, String superName, String[] ifaces) { + if (ctx.found) + return; + + ctx.onInternalTypeName(superName); + + if (ctx.found) + return; + + if (ifaces != null) { + for (String iface : ifaces) { + ctx.onInternalTypeName(iface); + + if (ctx.found) + return; + } + } + } + + /** {@inheritDoc} */ + @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { + if (ctx.found) + return null; + + ctx.onType(desc); + + return ctx.found ? null : av; + } + + /** {@inheritDoc} */ + @Override public void visitInnerClass(String name, String outerName, String innerName, int i) { + if (ctx.found) + return; + + ctx.onInternalTypeName(name); + } + + /** {@inheritDoc} */ + @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) { + if (ctx.found) + return null; + + ctx.onType(desc); + + return ctx.found ? null : fv; + } + + /** {@inheritDoc} */ + @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature, + String[] exceptions) { + if (ctx.found) + return null; + + ctx.onMethodsDesc(desc); + + // Process declared method exceptions: + if (exceptions != null) { + for (String e : exceptions) + ctx.onInternalTypeName(e); + } + + return ctx.found ? null : mv; + } + } + + /** + * Method visitor. + */ + private static class CollectingMethodVisitor extends MethodVisitor { + /** Collector. */ + private final CollectingContext ctx; + + /** Annotation visitor. */ + private final AnnotationVisitor av; + + /** + * Constructor. + * + * @param ctx Collector. + * @param av Annotation visitor. + */ + private CollectingMethodVisitor(CollectingContext ctx, AnnotationVisitor av) { + super(Opcodes.ASM4); + + this.ctx = ctx; + this.av = av; + } + + /** {@inheritDoc} */ + @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { + if (ctx.found) + return null; + + ctx.onType(desc); + + return ctx.found ? null : av; + } + + /** {@inheritDoc} */ + @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) { + if (ctx.found) + return null; + + ctx.onType(desc); + + return ctx.found ? null : av; + } + + /** {@inheritDoc} */ + @Override public AnnotationVisitor visitAnnotationDefault() { + return ctx.found ? null : av; + } + + /** {@inheritDoc} */ + @Override public void visitFieldInsn(int opcode, String owner, String name, String desc) { + if (ctx.found) + return; + + ctx.onInternalTypeName(owner); + + if (ctx.found) + return; + + ctx.onType(desc); + } + + /** {@inheritDoc} */ + @Override public void visitInvokeDynamicInsn(String name, String desc, Handle bsm, Object... bsmArgs) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void visitFrame(int type, int nLoc, Object[] locTypes, int nStack, Object[] stackTypes) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void visitLocalVariable(String name, String desc, String signature, Label lb, + Label lb2, int i) { + if (ctx.found) + return; + + ctx.onType(desc); + } + + /** {@inheritDoc} */ + @Override public void visitMethodInsn(int i, String owner, String name, String desc) { + if (ctx.found) + return; + + ctx.onInternalTypeName(owner); + + if (ctx.found) + return; + + ctx.onMethodsDesc(desc); + } + + /** {@inheritDoc} */ + @Override public void visitMultiANewArrayInsn(String desc, int dim) { + if (ctx.found) + return; + + ctx.onType(desc); + } + + /** {@inheritDoc} */ + @Override public void visitTryCatchBlock(Label start, Label end, Label hndl, String typeStr) { + if (ctx.found) + return; + + ctx.onInternalTypeName(typeStr); + } + + /** {@inheritDoc} */ + @Override public void visitTypeInsn(int opcode, String type) { + if (ctx.found) + return; + + ctx.onInternalTypeName(type); + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index f3e17f3..a2c55a2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -162,7 +162,8 @@ public class HadoopJobTracker extends HadoopComponent { if (ctx.configuration() != null) libNames = ctx.configuration().getNativeLibraryNames(); - HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(nodeId), libNames); + HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(nodeId), libNames, + ctx.kernalContext().hadoopHelper()); try { jobCls = (Class<HadoopV2Job>)ldr.loadClass(HadoopV2Job.class.getName()); @@ -1060,7 +1061,8 @@ public class HadoopJobTracker extends HadoopComponent { jobInfo = meta.jobInfo(); } - job = jobInfo.createJob(jobCls, jobId, log, ctx.configuration().getNativeLibraryNames()); + job = jobInfo.createJob(jobCls, jobId, log, ctx.configuration().getNativeLibraryNames(), + ctx.kernalContext().hadoopHelper()); job.initialize(false, ctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index 4a946e9..2ca9928 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; +import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; @@ -96,6 +98,11 @@ public class HadoopChildProcessRunner { /** Shuffle job. */ private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob; + /** Hadoop helper. + * NB: we consider helper to be thread-safe, so reusing the same instance for all the jobs. + */ + private final HadoopHelper hadoopHelper = new HadoopHelperImpl(); + /** Concurrent mappers. */ private int concMappers; @@ -134,7 +141,7 @@ public class HadoopChildProcessRunner { assert job == null; - job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log, null); + job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log, null, hadoopHelper); job.initialize(true, nodeDesc.processId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index 595474c..111022b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -52,6 +52,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; @@ -87,6 +88,9 @@ public class HadoopV2Job implements HadoopJob { /** */ private final JobContextImpl jobCtx; + /** */ + private final HadoopHelper hadoopHelper; + /** Hadoop job ID. */ private final HadoopJobId jobId; @@ -130,13 +134,14 @@ public class HadoopV2Job implements HadoopJob { * @param libNames Optional additional native library names. */ public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log, - @Nullable String[] libNames) { + @Nullable String[] libNames, HadoopHelper hadoopHelper) { assert jobId != null; assert jobInfo != null; this.jobId = jobId; this.jobInfo = jobInfo; this.libNames = libNames; + this.hadoopHelper = hadoopHelper; ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); @@ -255,7 +260,7 @@ public class HadoopV2Job implements HadoopJob { // Note that the classloader identified by the task it was initially created for, // but later it may be reused for other tasks. HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), - HadoopClassLoader.nameForTask(info, false), libNames); + HadoopClassLoader.nameForTask(info, false), libNames, hadoopHelper); cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java index 2fd7777..e202f48 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java @@ -49,7 +49,7 @@ import org.apache.ignite.internal.processors.hadoop.deps.Without; */ public class HadoopClassLoaderTest extends TestCase { /** */ - final HadoopClassLoader ldr = new HadoopClassLoader(null, "test", null); + final HadoopClassLoader ldr = new HadoopClassLoader(null, "test", null, new HadoopHelperImpl()); /** * @throws Exception If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java index 88d0f80..a4d1d24 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java @@ -145,7 +145,7 @@ public class HadoopPlannerMockJob implements HadoopJob { /** {@inheritDoc} */ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, - @Nullable String[] libNames) throws IgniteCheckedException { + @Nullable String[] libNames, HadoopHelper hadoopHelper) throws IgniteCheckedException { throwUnsupported(); return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java index b4e3dc2..656ba66 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java @@ -49,7 +49,7 @@ public class HadoopSnappyTest extends GridCommonAbstractTest { // Run the same in several more class loaders simulating jobs and tasks: for (int i = 0; i < 2; i++) { - ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i, null); + ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i, null, new HadoopHelperImpl()); Class<?> cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr); http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java index 27d7fc2..f914467 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -48,7 +48,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(uuid, 0); - return jobInfo.createJob(HadoopV2Job.class, jobId, log, null); + return jobInfo.createJob(HadoopV2Job.class, jobId, log, null, new HadoopHelperImpl()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index 30cf50c..faec383 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -67,7 +67,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(uuid, 0); - return jobInfo.createJob(HadoopV2Job.class, jobId, log, null); + return jobInfo.createJob(HadoopV2Job.class, jobId, log, null, new HadoopHelperImpl()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index ae2c00d..6b974bd 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -78,7 +78,7 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { HadoopJobId id = new HadoopJobId(uuid, 1); - HadoopJob job = info.createJob(HadoopV2Job.class, id, log, null); + HadoopJob job = info.createJob(HadoopV2Job.class, id, log, null, new HadoopHelperImpl()); HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); http://git-wip-us.apache.org/repos/asf/ignite/blob/64b00945/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index 5266875..02d4063 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -23,6 +23,7 @@ import org.apache.commons.collections.comparators.ComparableComparator; import org.apache.hadoop.io.IntWritable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; @@ -144,7 +145,7 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, - @Nullable String[] libNames) throws IgniteCheckedException { + @Nullable String[] libNames, HadoopHelper hadoopHelper) throws IgniteCheckedException { assert false; return null;