IGNITE-2218: Fixed a problem with native Hadoop libraries load. This closes #378.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7d58d14a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7d58d14a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7d58d14a Branch: refs/heads/ignite-2236 Commit: 7d58d14a80b1c32f88fbb4cf68e5d289c5aee474 Parents: 012ca73 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Mon Jan 4 12:14:58 2016 +0400 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Mon Jan 4 12:14:58 2016 +0400 ---------------------------------------------------------------------- .../processors/hadoop/HadoopClassLoader.java | 71 ++++++++++--- .../hadoop/v2/HadoopNativeCodeLoader.java | 74 -------------- .../hadoop/HadoopAbstractWordCountTest.java | 46 +++++++-- .../hadoop/HadoopMapReduceEmbeddedSelfTest.java | 2 +- .../processors/hadoop/HadoopMapReduceTest.java | 15 ++- .../hadoop/HadoopSnappyFullMapReduceTest.java | 28 +++++ .../processors/hadoop/HadoopSnappyTest.java | 102 +++++++++++++++++++ .../processors/hadoop/HadoopTasksV2Test.java | 2 +- .../hadoop/examples/HadoopWordCount1Reduce.java | 1 + .../hadoop/examples/HadoopWordCount2.java | 18 +++- .../examples/HadoopWordCount2Reducer.java | 1 + .../testsuites/IgniteHadoopTestSuite.java | 18 +++- 12 files changed, 279 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index 735133f..270b31d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -30,13 +30,14 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; - +import java.util.Vector; +import org.apache.hadoop.util.NativeCodeLoader; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.hadoop.v2.HadoopDaemon; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopNativeCodeLoader; import org.apache.ignite.internal.processors.hadoop.v2.HadoopShutdownHookManager; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.objectweb.asm.AnnotationVisitor; @@ -69,6 +70,9 @@ public class HadoopClassLoader extends URLClassLoader { /** Name of the Hadoop daemon class. */ public static final String HADOOP_DAEMON_CLASS_NAME = "org.apache.hadoop.util.Daemon"; + /** Name of libhadoop library. */ + private static final String LIBHADOOP = "hadoop."; + /** */ private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader(); @@ -119,6 +123,51 @@ public class HadoopClassLoader extends URLClassLoader { assert !(getParent() instanceof HadoopClassLoader); this.name = name; + + initializeNativeLibraries(); + } + + /** + * Workaround to load native Hadoop libraries. Java doesn't allow native libraries to be loaded from different + * classloaders. But we load Hadoop classes many times and one of these classes - {@code NativeCodeLoader} - tries + * to load the same native library over and over again. + * <p> + * To fix the problem, we force native library load in parent class loader and then "link" handle to this native + * library to our class loader. As a result, our class loader will think that the library is already loaded and will + * be able to link native methods. + * + * @see <a href="http://docs.oracle.com/javase/1.5.0/docs/guide/jni/spec/invocation.html#library_version"> + * JNI specification</a> + */ + private void initializeNativeLibraries() { + try { + // This must trigger native library load. + Class.forName(NativeCodeLoader.class.getName(), true, APP_CLS_LDR); + + final Vector<Object> curVector = U.field(this, "nativeLibraries"); + + ClassLoader ldr = APP_CLS_LDR; + + while (ldr != null) { + Vector vector = U.field(ldr, "nativeLibraries"); + + for (Object lib : vector) { + String libName = U.field(lib, "name"); + + if (libName.contains(LIBHADOOP)) { + curVector.add(lib); + + return; + } + } + + ldr = ldr.getParent(); + } + } + catch (Exception e) { + U.quietAndWarn(null, "Failed to initialize Hadoop native library " + + "(native Hadoop methods might not work properly): " + e); + } } /** @@ -152,8 +201,6 @@ public class HadoopClassLoader extends URLClassLoader { if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath. if (name.endsWith(".util.ShutdownHookManager")) // Dirty hack to get rid of Hadoop shutdown hooks. return loadFromBytes(name, HadoopShutdownHookManager.class.getName()); - else if (name.endsWith(".util.NativeCodeLoader")) - return loadFromBytes(name, HadoopNativeCodeLoader.class.getName()); else if (name.equals(HADOOP_DAEMON_CLASS_NAME)) // We replace this in order to be able to forcibly stop some daemon threads // that otherwise never stop (e.g. PeerCache runnables): @@ -274,7 +321,7 @@ public class HadoopClassLoader extends URLClassLoader { /** * Check whether class has external dependencies on Hadoop. - * + * * @param clsName Class name. * @return {@code True} if class has external dependencies. */ @@ -285,15 +332,15 @@ public class HadoopClassLoader extends URLClassLoader { ctx.mthdVisitor = new CollectingMethodVisitor(ctx, ctx.annVisitor); ctx.fldVisitor = new CollectingFieldVisitor(ctx, ctx.annVisitor); ctx.clsVisitor = new CollectingClassVisitor(ctx, ctx.annVisitor, ctx.mthdVisitor, ctx.fldVisitor); - + return hasExternalDependencies(clsName, ctx); } - + /** * Check whether class has external dependencies on Hadoop. - * + * * @param clsName Class name. - * @param ctx Context. + * @param ctx Context. * @return {@code true} If the class has external dependencies. */ boolean hasExternalDependencies(String clsName, CollectingContext ctx) { @@ -519,7 +566,7 @@ public class HadoopClassLoader extends URLClassLoader { /** Field visitor. */ private FieldVisitor fldVisitor; - + /** Class visitor. */ private ClassVisitor clsVisitor; @@ -627,7 +674,7 @@ public class HadoopClassLoader extends URLClassLoader { onType(t); } } - } + } /** * Annotation visitor. @@ -638,7 +685,7 @@ public class HadoopClassLoader extends URLClassLoader { /** * Annotation visitor. - * + * * @param ctx The collector. */ CollectingAnnotationVisitor(CollectingContext ctx) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java deleted file mode 100644 index 4c4840d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; - -/** - * A fake helper to load the native hadoop code i.e. libhadoop.so. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class HadoopNativeCodeLoader { - /** - * Check if native-hadoop code is loaded for this platform. - * - * @return <code>true</code> if native-hadoop is loaded, - * else <code>false</code> - */ - public static boolean isNativeCodeLoaded() { - return false; - } - - /** - * Returns true only if this build was compiled with support for snappy. - */ - public static boolean buildSupportsSnappy() { - return false; - } - - /** - * @return Library name. - */ - public static String getLibraryName() { - throw new IllegalStateException(); - } - - /** - * Return if native hadoop libraries, if present, can be used for this job. - * @param conf configuration - * - * @return <code>true</code> if native hadoop libraries, if present, can be - * used for this job; <code>false</code> otherwise. - */ - public boolean getLoadNativeLibraries(Configuration conf) { - return false; - } - - /** - * Set if native hadoop libraries, if present, can be used for this job. - * - * @param conf configuration - * @param loadNativeLibraries can native hadoop libraries be loaded - */ - public void setLoadNativeLibraries(Configuration conf, boolean loadNativeLibraries) { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java index a47eaf6..e45c127 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop; import com.google.common.base.Joiner; import java.io.BufferedReader; +import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintWriter; import java.util.ArrayList; @@ -26,6 +27,11 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.processors.igfs.IgfsEx; @@ -118,21 +124,49 @@ public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest } /** + * Read w/o decoding (default). + * + * @param fileName The file. + * @return The file contents, human-readable. + * @throws Exception On error. + */ + protected String readAndSortFile(String fileName) throws Exception { + return readAndSortFile(fileName, null); + } + + /** * Reads whole text file into String. * * @param fileName Name of the file to read. * @return Content of the file as String value. * @throws Exception If could not read the file. */ - protected String readAndSortFile(String fileName) throws Exception { - BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath(fileName)))); + protected String readAndSortFile(String fileName, Configuration conf) throws Exception { + final List<String> list = new ArrayList<>(); + + final boolean snappyDecode = conf != null && conf.getBoolean(FileOutputFormat.COMPRESS, false); + + if (snappyDecode) { + try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(new Path(fileName)))) { + Text key = new Text(); - List<String> list = new ArrayList<>(); + IntWritable val = new IntWritable(); - String line; + while (reader.next(key, val)) + list.add(key + "\t" + val); + } + } + else { + try (InputStream is0 = igfs.open(new IgfsPath(fileName))) { + BufferedReader reader = new BufferedReader(new InputStreamReader(is0)); + + String line; - while ((line = reader.readLine()) != null) - list.add(line); + while ((line = reader.readLine()) != null) + list.add(line); + } + } Collections.sort(list); http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java index c0eff48..25ef382 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java @@ -106,7 +106,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { Job job = Job.getInstance(jobConf); - HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI); + HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI, false); if (useNewAPI) { job.setPartitionerClass(CustomV2Partitioner.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index d0bd92b..7fd8272 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -183,7 +183,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { Job job = Job.getInstance(jobConf); - HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer); + HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy()); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); @@ -207,18 +207,29 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { checkOwner(new IgfsPath(outFile)); + String actual = readAndSortFile(outFile, job.getConfiguration()); + assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + useNewReducer, "blue\t" + blue + "\n" + "green\t" + green + "\n" + "red\t" + red + "\n" + "yellow\t" + yellow + "\n", - readAndSortFile(outFile) + actual ); } } /** + * Gets if to compress output data with Snappy. + * + * @return If to compress output data with Snappy. + */ + protected boolean compressOutputSnappy() { + return false; + } + + /** * Simple test job statistics. * * @param jobId Job id. http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java new file mode 100644 index 0000000..22d33a5 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +/** + * Same test as HadoopMapReduceTest, but with enabled Snappy output compression. + */ +public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest { + /** {@inheritDoc} */ + @Override protected boolean compressOutputSnappy() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java new file mode 100644 index 0000000..014ff1e --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.SnappyCodec; +import org.apache.hadoop.io.compress.snappy.SnappyCompressor; +import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests isolated Hadoop Snappy codec usage. + */ +public class HadoopSnappyTest extends GridCommonAbstractTest { + /** Length of data. */ + private static final int BYTE_SIZE = 1024 * 50; + + /** + * Checks Snappy codec usage. + * + * @throws Exception On error. + */ + public void testSnappy() throws Throwable { + // Run Snappy test in default class loader: + checkSnappy(); + + // Run the same in several more class loaders simulating jobs and tasks: + for (int i = 0; i < 2; i++) { + ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i); + + Class<?> cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr); + + assertEquals(hadoopClsLdr, cls.getClassLoader()); + + U.invoke(cls, null, "checkSnappy"); + } + } + + /** + * Internal check routine. + * + * @throws Throwable If failed. + */ + public static void checkSnappy() throws Throwable { + try { + byte[] expBytes = new byte[BYTE_SIZE]; + byte[] actualBytes = new byte[BYTE_SIZE]; + + for (int i = 0; i < expBytes.length ; i++) + expBytes[i] = (byte)ThreadLocalRandom.current().nextInt(16); + + SnappyCodec codec = new SnappyCodec(); + + codec.setConf(new Configuration()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try (CompressionOutputStream cos = codec.createOutputStream(baos)) { + cos.write(expBytes); + cos.flush(); + } + + try (CompressionInputStream cis = codec.createInputStream(new ByteArrayInputStream(baos.toByteArray()))) { + int read = cis.read(actualBytes, 0, actualBytes.length); + + assert read == actualBytes.length; + } + + assert Arrays.equals(expBytes, actualBytes); + } + catch (Throwable e) { + System.out.println("Snappy check failed:"); + System.out.println("### NativeCodeLoader.isNativeCodeLoaded: " + NativeCodeLoader.isNativeCodeLoaded()); + System.out.println("### SnappyCompressor.isNativeCodeLoaded: " + SnappyCompressor.isNativeCodeLoaded()); + + throw e; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index 3a964d6..d125deb 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -48,7 +48,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); - HadoopWordCount2.setTasksClasses(job, true, true, true); + HadoopWordCount2.setTasksClasses(job, true, true, true, false); Configuration conf = job.getConfiguration(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java index 120ac19..2335911 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java @@ -47,6 +47,7 @@ public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer<Tex output.collect(key, new IntWritable(sum)); } + /** {@inheritDoc} */ @Override public void configure(JobConf job) { super.configure(job); http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java index 942a908..4b508ca 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java @@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.hadoop.examples; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** @@ -62,7 +65,7 @@ public class HadoopWordCount2 { job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); - setTasksClasses(job, true, true, true); + setTasksClasses(job, true, true, true, false); FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); @@ -80,7 +83,8 @@ public class HadoopWordCount2 { * @param setCombiner Option to set combiner class. * @param setReducer Option to set reducer and output format classes. */ - public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer) { + public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer, + boolean outputCompression) { if (setMapper) { job.setMapperClass(HadoopWordCount2Mapper.class); job.setInputFormatClass(TextInputFormat.class); @@ -93,5 +97,15 @@ public class HadoopWordCount2 { job.setReducerClass(HadoopWordCount2Reducer.class); job.setOutputFormatClass(TextOutputFormat.class); } + + if (outputCompression) { + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK); + + SequenceFileOutputFormat.setCompressOutput(job, true); + + job.getConfiguration().set(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName()); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java index b2be53e..63a9d95 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java @@ -55,6 +55,7 @@ public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, In /** {@inheritDoc} */ @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); + wasSetUp = true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d58d14a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 1831085..6c542b5 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -25,6 +25,8 @@ import java.io.IOException; import java.net.URL; import java.net.URLConnection; import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; import junit.framework.TestSuite; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -63,6 +65,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobTrackerSelfTest; import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceEmbeddedSelfTest; import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceTest; import org.apache.ignite.internal.processors.hadoop.HadoopSerializationWrapperSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopSnappyTest; import org.apache.ignite.internal.processors.hadoop.HadoopSortingTest; import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapperSelfTest; import org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest; @@ -70,6 +73,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTasksV1Test; import org.apache.ignite.internal.processors.hadoop.HadoopTasksV2Test; import org.apache.ignite.internal.processors.hadoop.HadoopV2JobSelfTest; import org.apache.ignite.internal.processors.hadoop.HadoopValidationSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopSnappyFullMapReduceTest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimapSelftest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMapSelfTest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipListSelfTest; @@ -96,6 +100,9 @@ public class IgniteHadoopTestSuite extends TestSuite { TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite"); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopClassLoaderTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfs20FileSystemLoopbackPrimarySelfTest.class.getName()))); @@ -192,7 +199,7 @@ public class IgniteHadoopTestSuite extends TestSuite { X.println("Will use Hadoop version: " + ver); - String downloadPath = "hadoop/common/hadoop-" + ver + "/hadoop-" + ver + ".tar.gz"; + String downloadPath = "hadoop/core/hadoop-" + ver + "/hadoop-" + ver + ".tar.gz"; download("Hadoop", "HADOOP_HOME", downloadPath, "hadoop-" + ver); } @@ -217,6 +224,7 @@ public class IgniteHadoopTestSuite extends TestSuite { } List<String> urls = F.asList( + "http://archive.apache.org/dist/", "http://apache-mirror.rbc.ru/pub/apache/", "http://www.eu.apache.org/dist/", "http://www.us.apache.org/dist/"); @@ -273,6 +281,14 @@ public class IgniteHadoopTestSuite extends TestSuite { if (!dest.mkdirs()) throw new IllegalStateException(); } + else if (entry.isSymbolicLink()) { + // Important: in Hadoop installation there are symlinks, we need to create them: + Path theLinkItself = Paths.get(install.getAbsolutePath(), entry.getName()); + + Path linkTarget = Paths.get(entry.getLinkName()); + + Files.createSymbolicLink(theLinkItself, linkTarget); + } else { File parent = dest.getParentFile();