http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java deleted file mode 100644 index 2e0e271..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ /dev/null @@ -1,964 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.Vector; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.hadoop.util.NativeCodeLoader; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopDaemon; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopShutdownHookManager; -import org.apache.ignite.internal.util.ClassCache; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; -import org.objectweb.asm.AnnotationVisitor; -import org.objectweb.asm.Attribute; -import org.objectweb.asm.ClassReader; -import org.objectweb.asm.ClassVisitor; -import org.objectweb.asm.ClassWriter; -import org.objectweb.asm.FieldVisitor; -import org.objectweb.asm.Handle; -import org.objectweb.asm.Label; -import org.objectweb.asm.MethodVisitor; -import org.objectweb.asm.Opcodes; -import org.objectweb.asm.Type; -import org.objectweb.asm.commons.Remapper; -import org.objectweb.asm.commons.RemappingClassAdapter; - -/** - * Class loader allowing explicitly load classes without delegation to parent class loader. - * Also supports class parsing for finding dependencies which contain transitive dependencies - * unavailable for parent. - */ -public class HadoopClassLoader extends URLClassLoader implements ClassCache { - static { - // We are very parallel capable. - registerAsParallelCapable(); - } - - /** Name of the Hadoop daemon class. */ - public static final String HADOOP_DAEMON_CLASS_NAME = "org.apache.hadoop.util.Daemon"; - - /** Name of libhadoop library. */ - private static final String LIBHADOOP = "hadoop."; - - /** */ - private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader(); - - /** */ - private static final Collection<URL> appJars = F.asList(APP_CLS_LDR.getURLs()); - - /** */ - private static volatile Collection<URL> hadoopJars; - - /** */ - private static final Map<String, Boolean> cache = new ConcurrentHashMap8<>(); - - /** */ - private static final Map<String, byte[]> bytesCache = new ConcurrentHashMap8<>(); - - /** Class cache. */ - private final ConcurrentMap<String, Class> cacheMap = new ConcurrentHashMap<>(); - - /** Diagnostic name of this class loader. */ - @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"}) - private final String name; - - /** Native library names. */ - private final String[] libNames; - - /** - * Gets name for Job class loader. The name is specific for local node id. - * @param locNodeId The local node id. - * @return The class loader name. - */ - public static String nameForJob(UUID locNodeId) { - return "hadoop-job-node-" + locNodeId.toString(); - } - - /** - * Gets name for the task class loader. Task class loader - * @param info The task info. - * @param prefix Get only prefix (without task type and number) - * @return The class loader name. - */ - public static String nameForTask(HadoopTaskInfo info, boolean prefix) { - if (prefix) - return "hadoop-task-" + info.jobId() + "-"; - else - return "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber(); - } - - /** - * Constructor. - * - * @param urls Urls. - * @param name Classloader name. - * @param libNames Optional additional native library names to be linked from parent classloader. - */ - public HadoopClassLoader(URL[] urls, String name, @Nullable String[] libNames) { - super(addHadoopUrls(urls), APP_CLS_LDR); - - assert !(getParent() instanceof HadoopClassLoader); - - this.name = name; - this.libNames = libNames; - - initializeNativeLibraries(); - } - - /** - * Workaround to load native Hadoop libraries. Java doesn't allow native libraries to be loaded from different - * classloaders. But we load Hadoop classes many times and one of these classes - {@code NativeCodeLoader} - tries - * to load the same native library over and over again. - * <p> - * To fix the problem, we force native library load in parent class loader and then "link" handle to this native - * library to our class loader. As a result, our class loader will think that the library is already loaded and will - * be able to link native methods. - * - * @see <a href="http://docs.oracle.com/javase/1.5.0/docs/guide/jni/spec/invocation.html#library_version"> - * JNI specification</a> - */ - private void initializeNativeLibraries() { - try { - // This must trigger native library load. - Class.forName(NativeCodeLoader.class.getName(), true, APP_CLS_LDR); - - final Vector<Object> curVector = U.field(this, "nativeLibraries"); - - ClassLoader ldr = APP_CLS_LDR; - - while (ldr != null) { - Vector vector = U.field(ldr, "nativeLibraries"); - - for (Object lib : vector) { - String name = U.field(lib, "name"); - - boolean add = name.contains(LIBHADOOP); - - if (!add && libNames != null) { - for (String libName : libNames) { - if (libName != null && name.contains(libName)) { - add = true; - - break; - } - } - } - - if (add) { - curVector.add(lib); - - return; - } - } - - ldr = ldr.getParent(); - } - } - catch (Exception e) { - U.quietAndWarn(null, "Failed to initialize Hadoop native library " + - "(native Hadoop methods might not work properly): " + e); - } - } - - /** - * Need to parse only Ignite Hadoop and IGFS classes. - * - * @param cls Class name. - * @return {@code true} if we need to check this class. - */ - private static boolean isHadoopIgfs(String cls) { - String ignitePkgPrefix = "org.apache.ignite"; - - int len = ignitePkgPrefix.length(); - - return cls.startsWith(ignitePkgPrefix) && ( - cls.indexOf("igfs.", len) != -1 || - cls.indexOf(".fs.", len) != -1 || - cls.indexOf("hadoop.", len) != -1); - } - - /** - * @param cls Class name. - * @return {@code true} If this is Hadoop class. - */ - private static boolean isHadoop(String cls) { - return cls.startsWith("org.apache.hadoop."); - } - - /** {@inheritDoc} */ - @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { - try { - if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath. - if (name.endsWith(".util.ShutdownHookManager")) // Dirty hack to get rid of Hadoop shutdown hooks. - return loadFromBytes(name, HadoopShutdownHookManager.class.getName()); - else if (name.equals(HADOOP_DAEMON_CLASS_NAME)) - // We replace this in order to be able to forcibly stop some daemon threads - // that otherwise never stop (e.g. PeerCache runnables): - return loadFromBytes(name, HadoopDaemon.class.getName()); - - return loadClassExplicitly(name, resolve); - } - - if (isHadoopIgfs(name)) { // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop. - Boolean hasDeps = cache.get(name); - - if (hasDeps == null) { - hasDeps = hasExternalDependencies(name); - - cache.put(name, hasDeps); - } - - if (hasDeps) - return loadClassExplicitly(name, resolve); - } - - return super.loadClass(name, resolve); - } - catch (NoClassDefFoundError | ClassNotFoundException e) { - throw new ClassNotFoundException("Failed to load class: " + name, e); - } - } - - /** - * @param name Name. - * @param replace Replacement. - * @return Class. - */ - private Class<?> loadFromBytes(final String name, final String replace) { - synchronized (getClassLoadingLock(name)) { - // First, check if the class has already been loaded - Class c = findLoadedClass(name); - - if (c != null) - return c; - - byte[] bytes = bytesCache.get(name); - - if (bytes == null) { - InputStream in = loadClassBytes(getParent(), replace); - - ClassReader rdr; - - try { - rdr = new ClassReader(in); - } - catch (IOException e) { - throw new RuntimeException(e); - } - - ClassWriter w = new ClassWriter(Opcodes.ASM4); - - rdr.accept(new RemappingClassAdapter(w, new Remapper() { - /** */ - String replaceType = replace.replace('.', '/'); - - /** */ - String nameType = name.replace('.', '/'); - - @Override public String map(String type) { - if (type.equals(replaceType)) - return nameType; - - return type; - } - }), ClassReader.EXPAND_FRAMES); - - bytes = w.toByteArray(); - - bytesCache.put(name, bytes); - } - - return defineClass(name, bytes, 0, bytes.length); - } - } - - /** {@inheritDoc} */ - @Override public Class<?> getFromCache(String clsName) throws ClassNotFoundException { - Class<?> cls = cacheMap.get(clsName); - - if (cls == null) { - Class old = cacheMap.putIfAbsent(clsName, cls = Class.forName(clsName, true, this)); - - if (old != null) - cls = old; - } - - return cls; - } - - /** - * @param name Class name. - * @param resolve Resolve class. - * @return Class. - * @throws ClassNotFoundException If failed. - */ - private Class<?> loadClassExplicitly(String name, boolean resolve) throws ClassNotFoundException { - synchronized (getClassLoadingLock(name)) { - // First, check if the class has already been loaded - Class c = findLoadedClass(name); - - if (c == null) { - long t1 = System.nanoTime(); - - c = findClass(name); - - // this is the defining class loader; record the stats - sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1); - sun.misc.PerfCounter.getFindClasses().increment(); - } - - if (resolve) - resolveClass(c); - - return c; - } - } - - /** - * @param ldr Loader. - * @param clsName Class. - * @return Input stream. - */ - @Nullable private InputStream loadClassBytes(ClassLoader ldr, String clsName) { - return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class"); - } - - /** - * Check whether class has external dependencies on Hadoop. - * - * @param clsName Class name. - * @return {@code True} if class has external dependencies. - */ - boolean hasExternalDependencies(String clsName) { - CollectingContext ctx = new CollectingContext(); - - ctx.annVisitor = new CollectingAnnotationVisitor(ctx); - ctx.mthdVisitor = new CollectingMethodVisitor(ctx, ctx.annVisitor); - ctx.fldVisitor = new CollectingFieldVisitor(ctx, ctx.annVisitor); - ctx.clsVisitor = new CollectingClassVisitor(ctx, ctx.annVisitor, ctx.mthdVisitor, ctx.fldVisitor); - - return hasExternalDependencies(clsName, ctx); - } - - /** - * Check whether class has external dependencies on Hadoop. - * - * @param clsName Class name. - * @param ctx Context. - * @return {@code true} If the class has external dependencies. - */ - boolean hasExternalDependencies(String clsName, CollectingContext ctx) { - if (isHadoop(clsName)) // Hadoop must not be in classpath but Idea sucks, so filtering explicitly as external. - return true; - - // Try to get from parent to check if the type accessible. - InputStream in = loadClassBytes(getParent(), clsName); - - if (in == null) // The class is external itself, it must be loaded from this class loader. - return true; - - if (!isHadoopIgfs(clsName)) // Other classes should not have external dependencies. - return false; - - final ClassReader rdr; - - try { - rdr = new ClassReader(in); - } - catch (IOException e) { - throw new RuntimeException("Failed to read class: " + clsName, e); - } - - ctx.visited.add(clsName); - - rdr.accept(ctx.clsVisitor, 0); - - if (ctx.found) // We already know that we have dependencies, no need to check parent. - return true; - - // Here we are known to not have any dependencies but possibly we have a parent which has them. - int idx = clsName.lastIndexOf('$'); - - if (idx == -1) // No parent class. - return false; - - String parentCls = clsName.substring(0, idx); - - if (ctx.visited.contains(parentCls)) - return false; - - Boolean res = cache.get(parentCls); - - if (res == null) - res = hasExternalDependencies(parentCls, ctx); - - return res; - } - - /** - * @param name Class name. - * @return {@code true} If this is a valid class name. - */ - private static boolean validateClassName(String name) { - int len = name.length(); - - if (len <= 1) - return false; - - if (!Character.isJavaIdentifierStart(name.charAt(0))) - return false; - - boolean hasDot = false; - - for (int i = 1; i < len; i++) { - char c = name.charAt(i); - - if (c == '.') - hasDot = true; - else if (!Character.isJavaIdentifierPart(c)) - return false; - } - - return hasDot; - } - - /** - * @param urls URLs. - * @return URLs. - */ - private static URL[] addHadoopUrls(URL[] urls) { - Collection<URL> hadoopJars; - - try { - hadoopJars = hadoopUrls(); - } - catch (IgniteCheckedException e) { - throw new RuntimeException(e); - } - - ArrayList<URL> list = new ArrayList<>(hadoopJars.size() + appJars.size() + (urls == null ? 0 : urls.length)); - - list.addAll(appJars); - list.addAll(hadoopJars); - - if (!F.isEmpty(urls)) - list.addAll(F.asList(urls)); - - return list.toArray(new URL[list.size()]); - } - - /** - * @return Collection of jar URLs. - * @throws IgniteCheckedException If failed. - */ - public static Collection<URL> hadoopUrls() throws IgniteCheckedException { - Collection<URL> hadoopUrls = hadoopJars; - - if (hadoopUrls != null) - return hadoopUrls; - - synchronized (HadoopClassLoader.class) { - hadoopUrls = hadoopJars; - - if (hadoopUrls != null) - return hadoopUrls; - - try { - hadoopUrls = HadoopClasspathUtils.classpathForClassLoader(); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to resolve Hadoop JAR locations: " + e.getMessage(), e); - } - - hadoopJars = hadoopUrls; - - return hadoopUrls; - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopClassLoader.class, this); - } - - /** - * Getter for name field. - */ - public String name() { - return name; - } - - /** - * Context for dependencies collection. - */ - private class CollectingContext { - /** Visited classes. */ - private final Set<String> visited = new HashSet<>(); - - /** Whether dependency found. */ - private boolean found; - - /** Annotation visitor. */ - private AnnotationVisitor annVisitor; - - /** Method visitor. */ - private MethodVisitor mthdVisitor; - - /** Field visitor. */ - private FieldVisitor fldVisitor; - - /** Class visitor. */ - private ClassVisitor clsVisitor; - - /** - * Processes a method descriptor - * @param methDesc The method desc String. - */ - void onMethodsDesc(final String methDesc) { - // Process method return type: - onType(Type.getReturnType(methDesc)); - - if (found) - return; - - // Process method argument types: - for (Type t: Type.getArgumentTypes(methDesc)) { - onType(t); - - if (found) - return; - } - } - - /** - * Processes dependencies of a class. - * - * @param depCls The class name as dot-notated FQN. - */ - void onClass(final String depCls) { - assert depCls.indexOf('/') == -1 : depCls; // class name should be fully converted to dot notation. - assert depCls.charAt(0) != 'L' : depCls; - assert validateClassName(depCls) : depCls; - - if (depCls.startsWith("java.") || depCls.startsWith("javax.")) // Filter out platform classes. - return; - - if (visited.contains(depCls)) - return; - - Boolean res = cache.get(depCls); - - if (res == Boolean.TRUE || (res == null && hasExternalDependencies(depCls, this))) - found = true; - } - - /** - * Analyses dependencies of given type. - * - * @param t The type to process. - */ - void onType(Type t) { - if (t == null) - return; - - int sort = t.getSort(); - - switch (sort) { - case Type.ARRAY: - onType(t.getElementType()); - - break; - - case Type.OBJECT: - onClass(t.getClassName()); - - break; - } - } - - /** - * Analyses dependencies of given object type. - * - * @param objType The object type to process. - */ - void onInternalTypeName(String objType) { - if (objType == null) - return; - - assert objType.length() > 1 : objType; - - if (objType.charAt(0) == '[') - // handle array. In this case this is a type descriptor notation, like "[Ljava/lang/Object;" - onType(objType); - else { - assert objType.indexOf('.') == -1 : objType; // Must be slash-separated FQN. - - String clsName = objType.replace('/', '.'); // Convert it to dot notation. - - onClass(clsName); // Process. - } - } - - /** - * Type description analyser. - * - * @param desc The description. - */ - void onType(String desc) { - if (!F.isEmpty(desc)) { - if (desc.length() <= 1) - return; // Optimization: filter out primitive types in early stage. - - Type t = Type.getType(desc); - - onType(t); - } - } - } - - /** - * Annotation visitor. - */ - private static class CollectingAnnotationVisitor extends AnnotationVisitor { - /** */ - final CollectingContext ctx; - - /** - * Annotation visitor. - * - * @param ctx The collector. - */ - CollectingAnnotationVisitor(CollectingContext ctx) { - super(Opcodes.ASM4); - - this.ctx = ctx; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String name, String desc) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return this; - } - - /** {@inheritDoc} */ - @Override public void visitEnum(String name, String desc, String val) { - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitArray(String name) { - return ctx.found ? null : this; - } - - /** {@inheritDoc} */ - @Override public void visit(String name, Object val) { - if (ctx.found) - return; - - if (val instanceof Type) - ctx.onType((Type)val); - } - - /** {@inheritDoc} */ - @Override public void visitEnd() { - // No-op. - } - } - - /** - * Field visitor. - */ - private static class CollectingFieldVisitor extends FieldVisitor { - /** Collector. */ - private final CollectingContext ctx; - - /** Annotation visitor. */ - private final AnnotationVisitor av; - - /** - * Constructor. - */ - CollectingFieldVisitor(CollectingContext ctx, AnnotationVisitor av) { - super(Opcodes.ASM4); - - this.ctx = ctx; - this.av = av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public void visitAttribute(Attribute attr) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void visitEnd() { - // No-op. - } - } - - /** - * Class visitor. - */ - private static class CollectingClassVisitor extends ClassVisitor { - /** Collector. */ - private final CollectingContext ctx; - - /** Annotation visitor. */ - private final AnnotationVisitor av; - - /** Method visitor. */ - private final MethodVisitor mv; - - /** Field visitor. */ - private final FieldVisitor fv; - - /** - * Constructor. - * - * @param ctx Collector. - * @param av Annotation visitor. - * @param mv Method visitor. - * @param fv Field visitor. - */ - CollectingClassVisitor(CollectingContext ctx, AnnotationVisitor av, MethodVisitor mv, FieldVisitor fv) { - super(Opcodes.ASM4); - - this.ctx = ctx; - this.av = av; - this.mv = mv; - this.fv = fv; - } - - /** {@inheritDoc} */ - @Override public void visit(int i, int i2, String name, String signature, String superName, String[] ifaces) { - if (ctx.found) - return; - - ctx.onInternalTypeName(superName); - - if (ctx.found) - return; - - if (ifaces != null) { - for (String iface : ifaces) { - ctx.onInternalTypeName(iface); - - if (ctx.found) - return; - } - } - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public void visitInnerClass(String name, String outerName, String innerName, int i) { - if (ctx.found) - return; - - ctx.onInternalTypeName(name); - } - - /** {@inheritDoc} */ - @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : fv; - } - - /** {@inheritDoc} */ - @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature, - String[] exceptions) { - if (ctx.found) - return null; - - ctx.onMethodsDesc(desc); - - // Process declared method exceptions: - if (exceptions != null) { - for (String e : exceptions) - ctx.onInternalTypeName(e); - } - - return ctx.found ? null : mv; - } - } - - /** - * Method visitor. - */ - private static class CollectingMethodVisitor extends MethodVisitor { - /** Collector. */ - private final CollectingContext ctx; - - /** Annotation visitor. */ - private final AnnotationVisitor av; - - /** - * Constructor. - * - * @param ctx Collector. - * @param av Annotation visitor. - */ - private CollectingMethodVisitor(CollectingContext ctx, AnnotationVisitor av) { - super(Opcodes.ASM4); - - this.ctx = ctx; - this.av = av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotationDefault() { - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public void visitFieldInsn(int opcode, String owner, String name, String desc) { - if (ctx.found) - return; - - ctx.onInternalTypeName(owner); - - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public void visitInvokeDynamicInsn(String name, String desc, Handle bsm, Object... bsmArgs) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void visitFrame(int type, int nLoc, Object[] locTypes, int nStack, Object[] stackTypes) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void visitLocalVariable(String name, String desc, String signature, Label lb, - Label lb2, int i) { - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public void visitMethodInsn(int i, String owner, String name, String desc) { - if (ctx.found) - return; - - ctx.onInternalTypeName(owner); - - if (ctx.found) - return; - - ctx.onMethodsDesc(desc); - } - - /** {@inheritDoc} */ - @Override public void visitMultiANewArrayInsn(String desc, int dim) { - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public void visitTryCatchBlock(Label start, Label end, Label hndl, String typeStr) { - if (ctx.found) - return; - - ctx.onInternalTypeName(typeStr); - } - - /** {@inheritDoc} */ - @Override public void visitTypeInsn(int opcode, String type) { - if (ctx.found) - return; - - ctx.onInternalTypeName(type); - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathMain.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathMain.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathMain.java deleted file mode 100644 index 4069496..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathMain.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -/** - * Main class to compose Hadoop classpath depending on the environment. - * This class is designed to be independent on any Ignite classes as possible. - * Please make sure to pass the path separator character as the 1st parameter to the main method. - */ -public class HadoopClasspathMain { - /** - * Main method to be executed from scripts. It prints the classpath to the standard output. - * - * @param args The 1st argument should be the path separator character (":" on Linux, ";" on Windows). - */ - public static void main(String[] args) throws Exception { - if (args.length < 1) - throw new IllegalArgumentException("Path separator must be passed as the first argument."); - - String separator = args[0]; - - StringBuilder sb = new StringBuilder(); - - for (String path : HadoopClasspathUtils.classpathForProcess()) - sb.append(path).append(separator); - - System.out.println(sb); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathUtils.java deleted file mode 100644 index f5c2814..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClasspathUtils.java +++ /dev/null @@ -1,461 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; - -/** - * Hadoop classpath utilities. - */ -public class HadoopClasspathUtils { - /** Prefix directory. */ - public static final String PREFIX = "HADOOP_PREFIX"; - - /** Hadoop home directory. */ - public static final String HOME = "HADOOP_HOME"; - - /** Hadoop common directory. */ - public static final String COMMON_HOME = "HADOOP_COMMON_HOME"; - - /** Hadoop HDFS directory. */ - public static final String HDFS_HOME = "HADOOP_HDFS_HOME"; - - /** Hadoop mapred directory. */ - public static final String MAPRED_HOME = "HADOOP_MAPRED_HOME"; - - /** Arbitrary additional dependencies. Compliant with standard Java classpath resolution. */ - public static final String HADOOP_USER_LIBS = "HADOOP_USER_LIBS"; - - /** Empty string. */ - private static final String EMPTY_STR = ""; - - /** - * Gets Hadoop class path as list of classpath elements for process. - * - * @return List of the class path elements. - * @throws IOException If failed. - */ - public static List<String> classpathForProcess() throws IOException { - List<String> res = new ArrayList<>(); - - for (final SearchDirectory dir : classpathDirectories()) { - File[] files = dir.files(); - - if (dir.useWildcard()) { - if (files.length > 0) - res.add(dir.absolutePath() + File.separator + '*'); - } - else { - for (File file : files) - res.add(file.getAbsolutePath()); - } - } - - return res; - } - - /** - * Gets Hadoop class path as a list of URLs (for in-process class loader usage). - * - * @return List of class path URLs. - * @throws IOException If failed. - */ - public static List<URL> classpathForClassLoader() throws IOException { - List<URL> res = new ArrayList<>(); - - for (SearchDirectory dir : classpathDirectories()) { - for (File file : dir.files()) { - try { - res.add(file.toURI().toURL()); - } - catch (MalformedURLException e) { - throw new IOException("Failed to convert file path to URL: " + file.getPath()); - } - } - } - - return res; - } - - /** - * Gets Hadoop locations. - * - * @return The locations as determined from the environment. - */ - public static HadoopLocations locations() throws IOException { - // Query environment. - String hadoopHome = systemOrEnv(PREFIX, systemOrEnv(HOME, EMPTY_STR)); - - String commonHome = systemOrEnv(COMMON_HOME, EMPTY_STR); - String hdfsHome = systemOrEnv(HDFS_HOME, EMPTY_STR); - String mapredHome = systemOrEnv(MAPRED_HOME, EMPTY_STR); - - // If any composite location is defined, use only them. - if (!isEmpty(commonHome) || !isEmpty(hdfsHome) || !isEmpty(mapredHome)) { - HadoopLocations res = new HadoopLocations(hadoopHome, commonHome, hdfsHome, mapredHome); - - if (res.valid()) - return res; - else - throw new IOException("Failed to resolve Hadoop classpath because some environment variables are " + - "either undefined or point to nonexistent directories [" + - "[env=" + COMMON_HOME + ", value=" + commonHome + ", exists=" + res.commonExists() + "], " + - "[env=" + HDFS_HOME + ", value=" + hdfsHome + ", exists=" + res.hdfsExists() + "], " + - "[env=" + MAPRED_HOME + ", value=" + mapredHome + ", exists=" + res.mapredExists() + "]]"); - } - else if (!isEmpty(hadoopHome)) { - // All further checks will be based on HADOOP_HOME, so check for it's existence. - if (!exists(hadoopHome)) - throw new IOException("Failed to resolve Hadoop classpath because " + HOME + " environment " + - "variable points to nonexistent directory: " + hadoopHome); - - // Probe Apache Hadoop. - HadoopLocations res = new HadoopLocations( - hadoopHome, - hadoopHome + "/share/hadoop/common", - hadoopHome + "/share/hadoop/hdfs", - hadoopHome + "/share/hadoop/mapreduce" - ); - - if (res.valid()) - return res; - - // Probe CDH. - res = new HadoopLocations( - hadoopHome, - hadoopHome, - hadoopHome + "/../hadoop-hdfs", - hadoopHome + "/../hadoop-mapreduce" - ); - - if (res.valid()) - return res; - - // Probe HDP. - res = new HadoopLocations( - hadoopHome, - hadoopHome, - hadoopHome + "/../hadoop-hdfs-client", - hadoopHome + "/../hadoop-mapreduce-client" - ); - - if (res.valid()) - return res; - - // Failed. - throw new IOException("Failed to resolve Hadoop classpath because " + HOME + " environment variable " + - "is either invalid or points to non-standard Hadoop distribution: " + hadoopHome); - } - else { - // Advise to set HADOOP_HOME only as this is preferred way to configure classpath. - throw new IOException("Failed to resolve Hadoop classpath (please define " + HOME + " environment " + - "variable and point it to your Hadoop distribution)."); - } - } - - /** - * Gets base directories to discover classpath elements in. - * - * @return Collection of directory and mask pairs. - * @throws IOException if a mandatory classpath location is not found. - */ - private static Collection<SearchDirectory> classpathDirectories() throws IOException { - HadoopLocations loc = locations(); - - Collection<SearchDirectory> res = new ArrayList<>(); - - res.add(new SearchDirectory(new File(loc.common(), "lib"), AcceptAllDirectoryFilter.INSTANCE)); - res.add(new SearchDirectory(new File(loc.hdfs(), "lib"), AcceptAllDirectoryFilter.INSTANCE)); - res.add(new SearchDirectory(new File(loc.mapred(), "lib"), AcceptAllDirectoryFilter.INSTANCE)); - - res.add(new SearchDirectory(new File(loc.common()), new PrefixDirectoryFilter("hadoop-common-"))); - res.add(new SearchDirectory(new File(loc.common()), new PrefixDirectoryFilter("hadoop-auth-"))); - - res.add(new SearchDirectory(new File(loc.hdfs()), new PrefixDirectoryFilter("hadoop-hdfs-"))); - - res.add(new SearchDirectory(new File(loc.mapred()), - new PrefixDirectoryFilter("hadoop-mapreduce-client-common"))); - res.add(new SearchDirectory(new File(loc.mapred()), - new PrefixDirectoryFilter("hadoop-mapreduce-client-core"))); - - res.addAll(parseUserLibs()); - - return res; - } - - /** - * Parse user libs. - * - * @return Parsed libs search patterns. - * @throws IOException If failed. - */ - static Collection<SearchDirectory> parseUserLibs() throws IOException { - return parseUserLibs(systemOrEnv(HADOOP_USER_LIBS, null)); - } - - /** - * Parse user libs. - * - * @param str String. - * @return Result. - * @throws IOException If failed. - */ - static Collection<SearchDirectory> parseUserLibs(String str) throws IOException { - Collection<SearchDirectory> res = new LinkedList<>(); - - if (!isEmpty(str)) { - String[] tokens = normalize(str).split(File.pathSeparator); - - for (String token : tokens) { - // Skip empty tokens. - if (isEmpty(token)) - continue; - - File file = new File(token); - File dir = file.getParentFile(); - - if (token.endsWith("*")) { - assert dir != null; - - res.add(new SearchDirectory(dir, AcceptAllDirectoryFilter.INSTANCE, false)); - } - else { - // Met "/" or "C:\" pattern, nothing to do with it. - if (dir == null) - continue; - - res.add(new SearchDirectory(dir, new ExactDirectoryFilter(file.getName()), false)); - } - } - } - - return res; - } - - /** - * Get system property or environment variable with the given name. - * - * @param name Variable name. - * @param dflt Default value. - * @return Value. - */ - private static String systemOrEnv(String name, String dflt) { - String res = System.getProperty(name); - - if (res == null) - res = System.getenv(name); - - return res != null ? res : dflt; - } - - /** - * Answers if the given path denotes existing directory. - * - * @param path The directory path. - * @return {@code True} if the given path denotes an existing directory. - */ - public static boolean exists(String path) { - if (path == null) - return false; - - Path p = Paths.get(path); - - return Files.exists(p) && Files.isDirectory(p) && Files.isReadable(p); - } - - /** - * Check if string is empty. - * - * @param val Value. - * @return {@code True} if empty. - */ - private static boolean isEmpty(String val) { - return val == null || val.isEmpty(); - } - - /** - * NOramlize the string. - * - * @param str String. - * @return Normalized string. - */ - private static String normalize(String str) { - assert str != null; - - return str.trim().toLowerCase(); - } - - /** - * Simple pair-like structure to hold directory name and a mask assigned to it. - */ - static class SearchDirectory { - /** File. */ - private final File dir; - - /** Filter. */ - private final DirectoryFilter filter; - - /** Whether directory must exist. */ - private final boolean strict; - - /** - * Constructor for directory search with strict rule. - * - * @param dir Directory. - * @param filter Filter. - * @throws IOException If failed. - */ - private SearchDirectory(File dir, DirectoryFilter filter) throws IOException { - this(dir, filter, true); - } - - /** - * Constructor. - * - * @param dir Directory. - * @param filter Filter. - * @param strict Whether directory must exist. - * @throws IOException If failed. - */ - private SearchDirectory(File dir, DirectoryFilter filter, boolean strict) throws IOException { - this.dir = dir; - this.filter = filter; - this.strict = strict; - - if (strict && !exists(dir.getAbsolutePath())) - throw new IOException("Directory cannot be read: " + dir.getAbsolutePath()); - } - - /** - * @return Absolute path. - */ - String absolutePath() { - return dir.getAbsolutePath(); - } - - /** - * @return Child files. - */ - File[] files() throws IOException { - File[] files = dir.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return filter.test(name); - } - }); - - if (files == null) { - if (strict) - throw new IOException("Failed to get directory files [dir=" + dir + ']'); - else - return new File[0]; - } - else - return files; - } - - /** - * @return {@code True} if wildcard can be used. - */ - boolean useWildcard() { - return filter instanceof AcceptAllDirectoryFilter; - } - } - - /** - * Directory filter interface. - */ - static interface DirectoryFilter { - /** - * Test if file with this name should be included. - * - * @param name File name. - * @return {@code True} if passed. - */ - public boolean test(String name); - } - - /** - * Filter to accept all files. - */ - static class AcceptAllDirectoryFilter implements DirectoryFilter { - /** Singleton instance. */ - public static final AcceptAllDirectoryFilter INSTANCE = new AcceptAllDirectoryFilter(); - - /** {@inheritDoc} */ - @Override public boolean test(String name) { - return true; - } - } - - /** - * Filter which uses prefix to filter files. - */ - static class PrefixDirectoryFilter implements DirectoryFilter { - /** Prefix. */ - private final String prefix; - - /** - * Constructor. - * - * @param prefix Prefix. - */ - public PrefixDirectoryFilter(String prefix) { - assert prefix != null; - - this.prefix = normalize(prefix); - } - - /** {@inheritDoc} */ - @Override public boolean test(String name) { - return normalize(name).startsWith(prefix); - } - } - - /** - * Filter which uses exact comparison. - */ - static class ExactDirectoryFilter implements DirectoryFilter { - /** Name. */ - private final String name; - - /** - * Constructor. - * - * @param name Name. - */ - public ExactDirectoryFilter(String name) { - this.name = normalize(name); - } - - /** {@inheritDoc} */ - @Override public boolean test(String name) { - return normalize(name).equals(this.name); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java new file mode 100644 index 0000000..37af147 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.jetbrains.annotations.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.TreeSet; + +/** + * Common Hadoop utility methods which do not depend on Hadoop API. + */ +public class HadoopCommonUtils { + /** Job class name. */ + public static final String JOB_CLS_NAME = "org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job"; + + /** Property to store timestamp of new job id request. */ + public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs"; + + /** Property to store timestamp of response of new job id request. */ + public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs"; + + /** Property to store timestamp of job submission. */ + public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs"; + + /** Property to set custom writer of job statistics. */ + public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer"; + + /** + * Sort input splits by length. + * + * @param splits Splits. + * @return Sorted splits. + */ + public static List<HadoopInputSplit> sortInputSplits(Collection<HadoopInputSplit> splits) { + int id = 0; + + TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>(); + + for (HadoopInputSplit split : splits) { + long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0; + + sortedSplits.add(new SplitSortWrapper(id++, split, len)); + } + + ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size()); + + for (SplitSortWrapper sortedSplit : sortedSplits) + res.add(sortedSplit.split); + + return res; + } + + /** + * Set context class loader. + * + * @param newLdr New class loader. + * @return Old class loader. + */ + @Nullable public static ClassLoader setContextClassLoader(@Nullable ClassLoader newLdr) { + ClassLoader oldLdr = Thread.currentThread().getContextClassLoader(); + + if (newLdr != oldLdr) + Thread.currentThread().setContextClassLoader(newLdr); + + return oldLdr; + } + + /** + * Restore context class loader. + * + * @param oldLdr Original class loader. + */ + public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) { + ClassLoader newLdr = Thread.currentThread().getContextClassLoader(); + + if (newLdr != oldLdr) + Thread.currentThread().setContextClassLoader(oldLdr); + } + + /** + * Split wrapper for sorting. + */ + private static class SplitSortWrapper implements Comparable<SplitSortWrapper> { + /** Unique ID. */ + private final int id; + + /** Split. */ + private final HadoopInputSplit split; + + /** Split length. */ + private final long len; + + /** + * Constructor. + * + * @param id Unique ID. + * @param split Split. + * @param len Split length. + */ + public SplitSortWrapper(int id, HadoopInputSplit split, long len) { + this.id = id; + this.split = split; + this.len = len; + } + + /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") + @Override public int compareTo(SplitSortWrapper other) { + long res = len - other.len; + + if (res > 0) + return -1; + else if (res < 0) + return 1; + else + return id - other.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id; + } + } + + /** + * Private constructor. + */ + private HadoopCommonUtils() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java index 42a3d72..4326ad2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java @@ -24,7 +24,6 @@ import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.HadoopConfiguration; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata; import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle; http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java deleted file mode 100644 index 1382c1f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.lang.reflect.Constructor; -import java.util.HashMap; -import java.util.Map; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -/** - * Hadoop job info based on default Hadoop configuration. - */ -public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { - /** */ - private static final long serialVersionUID = 5489900236464999951L; - - /** {@code true} If job has combiner. */ - private boolean hasCombiner; - - /** Number of reducers configured for job. */ - private int numReduces; - - /** Configuration. */ - private Map<String,String> props = new HashMap<>(); - - /** Job name. */ - private String jobName; - - /** User name. */ - private String user; - - /** - * Default constructor required by {@link Externalizable}. - */ - public HadoopDefaultJobInfo() { - // No-op. - } - - /** - * Constructor. - * - * @param jobName Job name. - * @param user User name. - * @param hasCombiner {@code true} If job has combiner. - * @param numReduces Number of reducers configured for job. - * @param props All other properties of the job. - */ - public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces, - Map<String, String> props) { - this.jobName = jobName; - this.user = user; - this.hasCombiner = hasCombiner; - this.numReduces = numReduces; - this.props = props; - } - - /** {@inheritDoc} */ - @Nullable @Override public String property(String name) { - return props.get(name); - } - - /** {@inheritDoc} */ - @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, - @Nullable String[] libNames) throws IgniteCheckedException { - assert jobCls != null; - - try { - Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class, - HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class); - - return constructor.newInstance(jobId, this, log, libNames); - } - catch (Throwable t) { - if (t instanceof Error) - throw (Error)t; - - throw new IgniteCheckedException(t); - } - } - - /** {@inheritDoc} */ - @Override public boolean hasCombiner() { - return hasCombiner; - } - - /** {@inheritDoc} */ - @Override public boolean hasReducer() { - return reducers() > 0; - } - - /** {@inheritDoc} */ - @Override public int reducers() { - return numReduces; - } - - /** {@inheritDoc} */ - @Override public String jobName() { - return jobName; - } - - /** {@inheritDoc} */ - @Override public String user() { - return user; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, jobName); - U.writeString(out, user); - - out.writeBoolean(hasCombiner); - out.writeInt(numReduces); - - U.writeStringMap(out, props); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobName = U.readString(in); - user = U.readString(in); - - hasCombiner = in.readBoolean(); - numReduces = in.readInt(); - - props = U.readStringMap(in); - } - - /** - * @return Properties of the job. - */ - public Map<String, String> properties() { - return props; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java new file mode 100644 index 0000000..bd767b3 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Split serialized in external file. + */ +public class HadoopExternalSplit extends HadoopInputSplit { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long off; + + /** + * For {@link Externalizable}. + */ + public HadoopExternalSplit() { + // No-op. + } + + /** + * @param hosts Hosts. + * @param off Offset of this split in external file. + */ + public HadoopExternalSplit(String[] hosts, long off) { + assert off >= 0 : off; + assert hosts != null; + + this.hosts = hosts; + this.off = off; + } + + /** + * @return Offset of this input split in external file. + */ + public long offset() { + return off; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(off); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + off = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopExternalSplit that = (HadoopExternalSplit) o; + + return off == that.off; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(off ^ (off >>> 32)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java new file mode 100644 index 0000000..71bb8a4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelperImpl.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.GridKernalContext; +import org.jetbrains.annotations.Nullable; +import org.objectweb.asm.ClassReader; +import org.objectweb.asm.ClassWriter; +import org.objectweb.asm.Opcodes; +import org.objectweb.asm.commons.Remapper; +import org.objectweb.asm.commons.RemappingClassAdapter; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Utility methods for Hadoop classloader required to avoid direct 3rd-party dependencies in class loader. + */ +public class HadoopHelperImpl implements HadoopHelper { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Common class loader. */ + private volatile HadoopClassLoader ldr; + + /** + * Default constructor. + */ + public HadoopHelperImpl() { + this(null); + } + + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public HadoopHelperImpl(GridKernalContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public boolean isNoOp() { + return false; + } + + /** {@inheritDoc} */ + @Override public HadoopClassLoader commonClassLoader() { + HadoopClassLoader res = ldr; + + if (res == null) { + synchronized (this) { + res = ldr; + + if (res == null) { + String[] libNames = null; + + if (ctx != null && ctx.config().getHadoopConfiguration() != null) + libNames = ctx.config().getHadoopConfiguration().getNativeLibraryNames(); + + res = new HadoopClassLoader(null, "hadoop-common", libNames, this); + + ldr = res; + } + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public byte[] loadReplace(InputStream in, final String originalName, final String replaceName) { + ClassReader rdr; + + try { + rdr = new ClassReader(in); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + ClassWriter w = new ClassWriter(Opcodes.ASM4); + + rdr.accept(new RemappingClassAdapter(w, new Remapper() { + /** */ + String replaceType = replaceName.replace('.', '/'); + + /** */ + String nameType = originalName.replace('.', '/'); + + @Override public String map(String type) { + if (type.equals(replaceType)) + return nameType; + + return type; + } + }), ClassReader.EXPAND_FRAMES); + + return w.toByteArray(); + } + + /** {@inheritDoc} */ + @Override @Nullable public InputStream loadClassBytes(ClassLoader ldr, String clsName) { + return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopLocations.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopLocations.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopLocations.java deleted file mode 100644 index a90007f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopLocations.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -/** - * Simple structure to hold Hadoop directory locations. - */ -public class HadoopLocations { - /** Hadoop home. */ - private final String home; - - /** Common home. */ - private final String common; - - /** HDFS home. */ - private final String hdfs; - - /** Mapred home. */ - private final String mapred; - - /** Whether common home exists. */ - private final boolean commonExists; - - /** Whether HDFS home exists. */ - private final boolean hdfsExists; - - /** Whether mapred home exists. */ - private final boolean mapredExists; - - /** - * Constructor. - * - * @param home Hadoop home. - * @param common Common home. - * @param hdfs HDFS home. - * @param mapred Mapred home. - */ - public HadoopLocations(String home, String common, String hdfs, String mapred) { - assert common != null && hdfs != null && mapred != null; - - this.home = home; - this.common = common; - this.hdfs = hdfs; - this.mapred = mapred; - - commonExists = HadoopClasspathUtils.exists(common); - hdfsExists = HadoopClasspathUtils.exists(hdfs); - mapredExists = HadoopClasspathUtils.exists(mapred); - } - - /** - * @return Hadoop home. - */ - public String home() { - return home; - } - - /** - * @return Common home. - */ - public String common() { - return common; - } - - /** - * @return HDFS home. - */ - public String hdfs() { - return hdfs; - } - - /** - * @return Mapred home. - */ - public String mapred() { - return mapred; - } - - /** - * @return Whether common home exists. - */ - public boolean commonExists() { - return commonExists; - } - - /** - * @return Whether HDFS home exists. - */ - public boolean hdfsExists() { - return hdfsExists; - } - - /** - * @return Whether mapred home exists. - */ - public boolean mapredExists() { - return mapredExists; - } - - /** - * Whether all required directories exists. - * - * @return {@code True} if exists. - */ - public boolean valid() { - return commonExists && hdfsExists && mapredExists; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java deleted file mode 100644 index 4e03e17..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Iterator; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.CounterGroup; -import org.apache.hadoop.mapreduce.counters.CounterGroupBase; - -/** - * Hadoop +counter group adapter. - */ -class HadoopMapReduceCounterGroup implements CounterGroup { - /** Counters. */ - private final HadoopMapReduceCounters cntrs; - - /** Group name. */ - private final String name; - - /** - * Creates new instance. - * - * @param cntrs Client counters instance. - * @param name Group name. - */ - HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) { - this.cntrs = cntrs; - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String getName() { - return name; - } - - /** {@inheritDoc} */ - @Override public String getDisplayName() { - return name; - } - - /** {@inheritDoc} */ - @Override public void setDisplayName(String displayName) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void addCounter(Counter counter) { - addCounter(counter.getName(), counter.getDisplayName(), 0); - } - - /** {@inheritDoc} */ - @Override public Counter addCounter(String name, String displayName, long value) { - final Counter counter = cntrs.findCounter(this.name, name); - - counter.setValue(value); - - return counter; - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName, String displayName) { - return cntrs.findCounter(name, counterName); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName, boolean create) { - return cntrs.findCounter(name, counterName, create); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName) { - return cntrs.findCounter(name, counterName); - } - - /** {@inheritDoc} */ - @Override public int size() { - return cntrs.groupSize(name); - } - - /** {@inheritDoc} */ - @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) { - for (final Counter counter : rightGroup) - cntrs.findCounter(name, counter.getName()).increment(counter.getValue()); - } - - /** {@inheritDoc} */ - @Override public CounterGroupBase<Counter> getUnderlyingGroup() { - return this; - } - - /** {@inheritDoc} */ - @Override public Iterator<Counter> iterator() { - return cntrs.iterateGroup(name); - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java deleted file mode 100644 index 57a853f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.CounterGroup; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.FileSystemCounter; -import org.apache.hadoop.mapreduce.counters.AbstractCounters; -import org.apache.hadoop.mapreduce.counters.Limits; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Counter; -import org.apache.ignite.internal.util.typedef.T2; - -/** - * Hadoop counters adapter. - */ -public class HadoopMapReduceCounters extends Counters { - /** */ - private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>(); - - /** - * Creates new instance based on given counters. - * - * @param cntrs Counters to adapt. - */ - public HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters cntrs) { - for (HadoopCounter cntr : cntrs.all()) - if (cntr instanceof HadoopLongCounter) - this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr); - } - - /** {@inheritDoc} */ - @Override public synchronized CounterGroup addGroup(CounterGroup grp) { - return addGroup(grp.getName(), grp.getDisplayName()); - } - - /** {@inheritDoc} */ - @Override public CounterGroup addGroup(String name, String displayName) { - return new HadoopMapReduceCounterGroup(this, name); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String grpName, String cntrName) { - return findCounter(grpName, cntrName, true); - } - - /** {@inheritDoc} */ - @Override public synchronized Counter findCounter(Enum<?> key) { - return findCounter(key.getDeclaringClass().getName(), key.name(), true); - } - - /** {@inheritDoc} */ - @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) { - return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name()); - } - - /** {@inheritDoc} */ - @Override public synchronized Iterable<String> getGroupNames() { - Collection<String> res = new HashSet<>(); - - for (HadoopCounter counter : cntrs.values()) - res.add(counter.group()); - - return res; - } - - /** {@inheritDoc} */ - @Override public Iterator<CounterGroup> iterator() { - final Iterator<String> iter = getGroupNames().iterator(); - - return new Iterator<CounterGroup>() { - @Override public boolean hasNext() { - return iter.hasNext(); - } - - @Override public CounterGroup next() { - if (!hasNext()) - throw new NoSuchElementException(); - - return new HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next()); - } - - @Override public void remove() { - throw new UnsupportedOperationException("not implemented"); - } - }; - } - - /** {@inheritDoc} */ - @Override public synchronized CounterGroup getGroup(String grpName) { - return new HadoopMapReduceCounterGroup(this, grpName); - } - - /** {@inheritDoc} */ - @Override public synchronized int countCounters() { - return cntrs.size(); - } - - /** {@inheritDoc} */ - @Override public synchronized void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public synchronized void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) { - for (CounterGroup group : other) { - for (Counter counter : group) { - findCounter(group.getName(), counter.getName()).increment(counter.getValue()); - } - } - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object genericRight) { - if (!(genericRight instanceof HadoopMapReduceCounters)) - return false; - - return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return cntrs.hashCode(); - } - - /** {@inheritDoc} */ - @Override public void setWriteAllCounters(boolean snd) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean getWriteAllCounters() { - return true; - } - - /** {@inheritDoc} */ - @Override public Limits limits() { - return null; - } - - /** - * Returns size of a group. - * - * @param grpName Name of the group. - * @return amount of counters in the given group. - */ - public int groupSize(String grpName) { - int res = 0; - - for (HadoopCounter counter : cntrs.values()) { - if (grpName.equals(counter.group())) - res++; - } - - return res; - } - - /** - * Returns counters iterator for specified group. - * - * @param grpName Name of the group to iterate. - * @return Counters iterator. - */ - public Iterator<Counter> iterateGroup(String grpName) { - Collection<Counter> grpCounters = new ArrayList<>(); - - for (HadoopLongCounter counter : cntrs.values()) { - if (grpName.equals(counter.group())) - grpCounters.add(new HadoopV2Counter(counter)); - } - - return grpCounters.iterator(); - } - - /** - * Find a counter in the group. - * - * @param grpName The name of the counter group. - * @param cntrName The name of the counter. - * @param create Create the counter if not found if true. - * @return The counter that was found or added or {@code null} if create is false. - */ - public Counter findCounter(String grpName, String cntrName, boolean create) { - T2<String, String> key = new T2<>(grpName, cntrName); - - HadoopLongCounter internalCntr = cntrs.get(key); - - if (internalCntr == null & create) { - internalCntr = new HadoopLongCounter(grpName,cntrName); - - cntrs.put(key, new HadoopLongCounter(grpName,cntrName)); - } - - return internalCntr == null ? null : new HadoopV2Counter(internalCntr); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java index b9c20c3..f0df1e9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java @@ -40,6 +40,9 @@ import java.util.concurrent.atomic.AtomicInteger; * Hadoop processor. */ public class HadoopProcessor extends HadoopProcessorAdapter { + /** Class to probe for Hadoop libraries in Ignite classpath. */ + private static final String HADOOP_PROBE_CLS = "org.apache.hadoop.conf.Configuration"; + /** Job ID counter. */ private final AtomicInteger idCtr = new AtomicInteger(); @@ -164,7 +167,14 @@ public class HadoopProcessor extends HadoopProcessorAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { - return hctx.jobTracker().submit(jobId, jobInfo); + ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader()); + + try { + return hctx.jobTracker().submit(jobId, jobInfo); + } + finally { + HadoopCommonUtils.restoreContextClassLoader(oldLdr); + } } /** {@inheritDoc} */ @@ -203,6 +213,26 @@ public class HadoopProcessor extends HadoopProcessorAdapter { throw new IgniteCheckedException(ioe.getMessage(), ioe); } + // Check if Hadoop is in parent class loader classpath. + try { + Class cls = Class.forName(HADOOP_PROBE_CLS, false, getClass().getClassLoader()); + + try { + String path = cls.getProtectionDomain().getCodeSource().getLocation().toString(); + + U.warn(log, "Hadoop libraries are found in Ignite classpath, this could lead to class loading " + + "errors (please remove all Hadoop libraries from Ignite classpath) [path=" + path + ']'); + } + catch (Throwable ignore) { + U.warn(log, "Hadoop libraries are found in Ignite classpath, this could lead to class loading " + + "errors (please remove all Hadoop libraries from Ignite classpath)"); + } + } + catch (Throwable ignore) { + // All is fine. + } + + // Try assembling Hadoop URLs. HadoopClassLoader.hadoopUrls(); }