Repository: incubator-ignite Updated Branches: refs/heads/ignite-386 4c85f1209 -> 28fad1854
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java deleted file mode 100644 index 98475fb..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.util.*; - -/** - * - */ -public class GridHadoopDataStreamSelfTest extends GridCommonAbstractTest { - - public void testStreams() throws IOException { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - - HadoopDataOutStream out = new HadoopDataOutStream(mem); - - int size = 4 * 1024; - - final long ptr = mem.allocate(size); - - out.buffer().set(ptr, size); - - out.writeBoolean(false); - out.writeBoolean(true); - out.writeBoolean(false); - out.write(17); - out.write(121); - out.write(0xfafa); - out.writeByte(17); - out.writeByte(121); - out.writeByte(0xfafa); - out.writeChar('z'); - out.writeChar('o'); - out.writeChar('r'); - out.writeShort(100); - out.writeShort(Short.MIN_VALUE); - out.writeShort(Short.MAX_VALUE); - out.writeShort(65535); - out.writeShort(65536); // 0 - out.writeInt(Integer.MAX_VALUE); - out.writeInt(Integer.MIN_VALUE); - out.writeInt(-1); - out.writeInt(0); - out.writeInt(1); - out.writeFloat(0.33f); - out.writeFloat(0.5f); - out.writeFloat(-0.7f); - out.writeFloat(Float.MAX_VALUE); - out.writeFloat(Float.MIN_VALUE); - out.writeFloat(Float.MIN_NORMAL); - out.writeFloat(Float.POSITIVE_INFINITY); - out.writeFloat(Float.NEGATIVE_INFINITY); - out.writeFloat(Float.NaN); - out.writeDouble(-12312312.3333333336666779); - out.writeDouble(123123.234); - out.writeDouble(Double.MAX_VALUE); - out.writeDouble(Double.MIN_VALUE); - out.writeDouble(Double.MIN_NORMAL); - out.writeDouble(Double.NEGATIVE_INFINITY); - out.writeDouble(Double.POSITIVE_INFINITY); - out.writeDouble(Double.NaN); - out.writeLong(Long.MAX_VALUE); - out.writeLong(Long.MIN_VALUE); - out.writeLong(0); - out.writeLong(-1L); - out.write(new byte[]{1,2,3}); - out.write(new byte[]{0,1,2,3}, 1, 2); - out.writeUTF("mom washes rum"); - - HadoopDataInStream in = new HadoopDataInStream(mem); - - in.buffer().set(ptr, out.buffer().pointer()); - - assertEquals(false, in.readBoolean()); - assertEquals(true, in.readBoolean()); - assertEquals(false, in.readBoolean()); - assertEquals(17, in.read()); - assertEquals(121, in.read()); - assertEquals(0xfa, in.read()); - assertEquals(17, in.readByte()); - assertEquals(121, in.readByte()); - assertEquals((byte)0xfa, in.readByte()); - assertEquals('z', in.readChar()); - assertEquals('o', in.readChar()); - assertEquals('r', in.readChar()); - assertEquals(100, in.readShort()); - assertEquals(Short.MIN_VALUE, in.readShort()); - assertEquals(Short.MAX_VALUE, in.readShort()); - assertEquals(-1, in.readShort()); - assertEquals(0, in.readShort()); - assertEquals(Integer.MAX_VALUE, in.readInt()); - assertEquals(Integer.MIN_VALUE, in.readInt()); - assertEquals(-1, in.readInt()); - assertEquals(0, in.readInt()); - assertEquals(1, in.readInt()); - assertEquals(0.33f, in.readFloat()); - assertEquals(0.5f, in.readFloat()); - assertEquals(-0.7f, in.readFloat()); - assertEquals(Float.MAX_VALUE, in.readFloat()); - assertEquals(Float.MIN_VALUE, in.readFloat()); - assertEquals(Float.MIN_NORMAL, in.readFloat()); - assertEquals(Float.POSITIVE_INFINITY, in.readFloat()); - assertEquals(Float.NEGATIVE_INFINITY, in.readFloat()); - assertEquals(Float.NaN, in.readFloat()); - assertEquals(-12312312.3333333336666779, in.readDouble()); - assertEquals(123123.234, in.readDouble()); - assertEquals(Double.MAX_VALUE, in.readDouble()); - assertEquals(Double.MIN_VALUE, in.readDouble()); - assertEquals(Double.MIN_NORMAL, in.readDouble()); - assertEquals(Double.NEGATIVE_INFINITY, in.readDouble()); - assertEquals(Double.POSITIVE_INFINITY, in.readDouble()); - assertEquals(Double.NaN, in.readDouble()); - assertEquals(Long.MAX_VALUE, in.readLong()); - assertEquals(Long.MIN_VALUE, in.readLong()); - assertEquals(0, in.readLong()); - assertEquals(-1, in.readLong()); - - byte[] b = new byte[3]; - - in.read(b); - - assertTrue(Arrays.equals(new byte[]{1,2,3}, b)); - - b = new byte[4]; - - in.read(b, 1, 2); - - assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b)); - - assertEquals("mom washes rum", in.readUTF()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java new file mode 100644 index 0000000..48b99ab --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * + */ +public class HadoopDataStreamSelfTest extends GridCommonAbstractTest { + + public void testStreams() throws IOException { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + + HadoopDataOutStream out = new HadoopDataOutStream(mem); + + int size = 4 * 1024; + + final long ptr = mem.allocate(size); + + out.buffer().set(ptr, size); + + out.writeBoolean(false); + out.writeBoolean(true); + out.writeBoolean(false); + out.write(17); + out.write(121); + out.write(0xfafa); + out.writeByte(17); + out.writeByte(121); + out.writeByte(0xfafa); + out.writeChar('z'); + out.writeChar('o'); + out.writeChar('r'); + out.writeShort(100); + out.writeShort(Short.MIN_VALUE); + out.writeShort(Short.MAX_VALUE); + out.writeShort(65535); + out.writeShort(65536); // 0 + out.writeInt(Integer.MAX_VALUE); + out.writeInt(Integer.MIN_VALUE); + out.writeInt(-1); + out.writeInt(0); + out.writeInt(1); + out.writeFloat(0.33f); + out.writeFloat(0.5f); + out.writeFloat(-0.7f); + out.writeFloat(Float.MAX_VALUE); + out.writeFloat(Float.MIN_VALUE); + out.writeFloat(Float.MIN_NORMAL); + out.writeFloat(Float.POSITIVE_INFINITY); + out.writeFloat(Float.NEGATIVE_INFINITY); + out.writeFloat(Float.NaN); + out.writeDouble(-12312312.3333333336666779); + out.writeDouble(123123.234); + out.writeDouble(Double.MAX_VALUE); + out.writeDouble(Double.MIN_VALUE); + out.writeDouble(Double.MIN_NORMAL); + out.writeDouble(Double.NEGATIVE_INFINITY); + out.writeDouble(Double.POSITIVE_INFINITY); + out.writeDouble(Double.NaN); + out.writeLong(Long.MAX_VALUE); + out.writeLong(Long.MIN_VALUE); + out.writeLong(0); + out.writeLong(-1L); + out.write(new byte[]{1,2,3}); + out.write(new byte[]{0,1,2,3}, 1, 2); + out.writeUTF("mom washes rum"); + + HadoopDataInStream in = new HadoopDataInStream(mem); + + in.buffer().set(ptr, out.buffer().pointer()); + + assertEquals(false, in.readBoolean()); + assertEquals(true, in.readBoolean()); + assertEquals(false, in.readBoolean()); + assertEquals(17, in.read()); + assertEquals(121, in.read()); + assertEquals(0xfa, in.read()); + assertEquals(17, in.readByte()); + assertEquals(121, in.readByte()); + assertEquals((byte)0xfa, in.readByte()); + assertEquals('z', in.readChar()); + assertEquals('o', in.readChar()); + assertEquals('r', in.readChar()); + assertEquals(100, in.readShort()); + assertEquals(Short.MIN_VALUE, in.readShort()); + assertEquals(Short.MAX_VALUE, in.readShort()); + assertEquals(-1, in.readShort()); + assertEquals(0, in.readShort()); + assertEquals(Integer.MAX_VALUE, in.readInt()); + assertEquals(Integer.MIN_VALUE, in.readInt()); + assertEquals(-1, in.readInt()); + assertEquals(0, in.readInt()); + assertEquals(1, in.readInt()); + assertEquals(0.33f, in.readFloat()); + assertEquals(0.5f, in.readFloat()); + assertEquals(-0.7f, in.readFloat()); + assertEquals(Float.MAX_VALUE, in.readFloat()); + assertEquals(Float.MIN_VALUE, in.readFloat()); + assertEquals(Float.MIN_NORMAL, in.readFloat()); + assertEquals(Float.POSITIVE_INFINITY, in.readFloat()); + assertEquals(Float.NEGATIVE_INFINITY, in.readFloat()); + assertEquals(Float.NaN, in.readFloat()); + assertEquals(-12312312.3333333336666779, in.readDouble()); + assertEquals(123123.234, in.readDouble()); + assertEquals(Double.MAX_VALUE, in.readDouble()); + assertEquals(Double.MIN_VALUE, in.readDouble()); + assertEquals(Double.MIN_NORMAL, in.readDouble()); + assertEquals(Double.NEGATIVE_INFINITY, in.readDouble()); + assertEquals(Double.POSITIVE_INFINITY, in.readDouble()); + assertEquals(Double.NaN, in.readDouble()); + assertEquals(Long.MAX_VALUE, in.readLong()); + assertEquals(Long.MIN_VALUE, in.readLong()); + assertEquals(0, in.readLong()); + assertEquals(-1, in.readLong()); + + byte[] b = new byte[3]; + + in.read(b); + + assertTrue(Arrays.equals(new byte[]{1,2,3}, b)); + + b = new byte[4]; + + in.read(b, 1, 2); + + assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b)); + + assertEquals("mom washes rum", in.readUTF()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutionSelfTest.java deleted file mode 100644 index ab65e77..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutionSelfTest.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; - -/** - * Job tracker self test. - */ -public class GridHadoopExternalTaskExecutionSelfTest extends GridHadoopAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setExternalExecution(true); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testSimpleTaskSubmit() throws Exception { - String testInputFile = "/test"; - - prepareTestFile(testInputFile); - - Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - Job job = Job.getInstance(cfg); - - job.setMapperClass(TestMapper.class); - job.setCombinerClass(TestReducer.class); - job.setReducerClass(TestReducer.class); - - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(IntWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setNumReduceTasks(1); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); - - job.setJarByClass(getClass()); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); - - fut.get(); - } - - /** - * @throws Exception If failed. - */ - public void testMapperException() throws Exception { - String testInputFile = "/test"; - - prepareTestFile(testInputFile); - - Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - Job job = Job.getInstance(cfg); - - job.setMapperClass(TestFailingMapper.class); - job.setCombinerClass(TestReducer.class); - job.setReducerClass(TestReducer.class); - - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(IntWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setNumReduceTasks(1); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); - - job.setJarByClass(getClass()); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); - - try { - fut.get(); - } - catch (IgniteCheckedException e) { - IOException exp = X.cause(e, IOException.class); - - assertNotNull(exp); - assertEquals("Test failure", exp.getMessage()); - } - } - - /** - * @param filePath File path to prepare. - * @throws Exception If failed. - */ - private void prepareTestFile(String filePath) throws Exception { - IgniteFs igfs = grid(0).fileSystem(igfsName); - - try (IgfsOutputStream out = igfs.create(new IgfsPath(filePath), true)) { - PrintWriter wr = new PrintWriter(new OutputStreamWriter(out)); - - for (int i = 0; i < 1000; i++) - wr.println("Hello, world: " + i); - - wr.flush(); - } - } - - /** - * - */ - private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { - /** One constant. */ - private IntWritable one = new IntWritable(1); - - /** Line constant. */ - private Text line = new Text("line"); - - @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - ctx.write(line, one); - } - } - - /** - * Failing mapper. - */ - private static class TestFailingMapper extends Mapper<Object, Text, Text, IntWritable> { - @Override protected void map(Object key, Text val, Context c) throws IOException, InterruptedException { - throw new IOException("Test failure"); - } - } - - /** - * - */ - private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { - /** Line constant. */ - private Text line = new Text("line"); - - @Override protected void setup(Context ctx) throws IOException, InterruptedException { - super.setup(ctx); - } - - /** {@inheritDoc} */ - @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) - throws IOException, InterruptedException { - int s = 0; - - for (IntWritable val : values) - s += val.get(); - - System.out.println(">>>> Reduced: " + s); - - ctx.write(line, new IntWritable(s)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java new file mode 100644 index 0000000..3735aab --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Job tracker self test. + */ +public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { + GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(true); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSimpleTaskSubmit() throws Exception { + String testInputFile = "/test"; + + prepareTestFile(testInputFile); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + Job job = Job.getInstance(cfg); + + job.setMapperClass(TestMapper.class); + job.setCombinerClass(TestReducer.class); + job.setReducerClass(TestReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); + + job.setJarByClass(getClass()); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + fut.get(); + } + + /** + * @throws Exception If failed. + */ + public void testMapperException() throws Exception { + String testInputFile = "/test"; + + prepareTestFile(testInputFile); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + Job job = Job.getInstance(cfg); + + job.setMapperClass(TestFailingMapper.class); + job.setCombinerClass(TestReducer.class); + job.setReducerClass(TestReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); + + job.setJarByClass(getClass()); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + try { + fut.get(); + } + catch (IgniteCheckedException e) { + IOException exp = X.cause(e, IOException.class); + + assertNotNull(exp); + assertEquals("Test failure", exp.getMessage()); + } + } + + /** + * @param filePath File path to prepare. + * @throws Exception If failed. + */ + private void prepareTestFile(String filePath) throws Exception { + IgniteFs igfs = grid(0).fileSystem(igfsName); + + try (IgfsOutputStream out = igfs.create(new IgfsPath(filePath), true)) { + PrintWriter wr = new PrintWriter(new OutputStreamWriter(out)); + + for (int i = 0; i < 1000; i++) + wr.println("Hello, world: " + i); + + wr.flush(); + } + } + + /** + * + */ + private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { + /** One constant. */ + private IntWritable one = new IntWritable(1); + + /** Line constant. */ + private Text line = new Text("line"); + + @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + ctx.write(line, one); + } + } + + /** + * Failing mapper. + */ + private static class TestFailingMapper extends Mapper<Object, Text, Text, IntWritable> { + @Override protected void map(Object key, Text val, Context c) throws IOException, InterruptedException { + throw new IOException("Test failure"); + } + } + + /** + * + */ + private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { + /** Line constant. */ + private Text line = new Text("line"); + + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + super.setup(ctx); + } + + /** {@inheritDoc} */ + @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) + throws IOException, InterruptedException { + int s = 0; + + for (IntWritable val : values) + s += val.get(); + + System.out.println(">>>> Reduced: " + s); + + ctx.write(line, new IntWritable(s)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 5371f56..1398886 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -79,46 +79,46 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(IgfsEventsTestSuite.suiteNoarchOnly()); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopFileSystemsTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopFileSystemsTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopValidationSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopValidationSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopDefaultMapReducePlannerSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopJobTrackerSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopJobTrackerSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopHashMapSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopDataStreamSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopHashMapSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopDataStreamSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(GridHadoopConcurrentHashMultimapSelftest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopSkipListSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSkipListSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopTaskExecutionSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopTaskExecutionSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopV2JobSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopSerializationWrapperSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopSplitWrapperSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopTasksV1Test.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopTasksV2Test.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopTasksV1Test.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopTasksV2Test.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopMapReduceTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopMapReduceEmbeddedSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceEmbeddedSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopExternalTaskExecutionSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopExternalTaskExecutionSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopExternalCommunicationSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopSortingTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSortingTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopSortingExternalTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSortingExternalTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopGroupingTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopGroupingTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopClientProtocolSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopClientProtocolEmbeddedSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopCommandLineTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopCommandLineTest.class.getName()))); return suite; }