http://git-wip-us.apache.org/repos/asf/ignite/blob/4c09b38c/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java deleted file mode 100644 index 3fea0ae..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -import java.io.IOException; -import java.util.Arrays; - -import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * - */ -public class HadoopDataStreamSelfTest extends GridCommonAbstractTest { - - public void testStreams() throws IOException { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - - HadoopDataOutStream out = new HadoopDataOutStream(mem); - - int size = 4 * 1024; - - final long ptr = mem.allocate(size); - - out.buffer().set(ptr, size); - - out.writeBoolean(false); - out.writeBoolean(true); - out.writeBoolean(false); - out.write(17); - out.write(121); - out.write(0xfafa); - out.writeByte(17); - out.writeByte(121); - out.writeByte(0xfafa); - out.writeChar('z'); - out.writeChar('o'); - out.writeChar('r'); - out.writeShort(100); - out.writeShort(Short.MIN_VALUE); - out.writeShort(Short.MAX_VALUE); - out.writeShort(65535); - out.writeShort(65536); // 0 - out.writeInt(Integer.MAX_VALUE); - out.writeInt(Integer.MIN_VALUE); - out.writeInt(-1); - out.writeInt(0); - out.writeInt(1); - out.writeFloat(0.33f); - out.writeFloat(0.5f); - out.writeFloat(-0.7f); - out.writeFloat(Float.MAX_VALUE); - out.writeFloat(Float.MIN_VALUE); - out.writeFloat(Float.MIN_NORMAL); - out.writeFloat(Float.POSITIVE_INFINITY); - out.writeFloat(Float.NEGATIVE_INFINITY); - out.writeFloat(Float.NaN); - out.writeDouble(-12312312.3333333336666779); - out.writeDouble(123123.234); - out.writeDouble(Double.MAX_VALUE); - out.writeDouble(Double.MIN_VALUE); - out.writeDouble(Double.MIN_NORMAL); - out.writeDouble(Double.NEGATIVE_INFINITY); - out.writeDouble(Double.POSITIVE_INFINITY); - out.writeDouble(Double.NaN); - out.writeLong(Long.MAX_VALUE); - out.writeLong(Long.MIN_VALUE); - out.writeLong(0); - out.writeLong(-1L); - out.write(new byte[]{1,2,3}); - out.write(new byte[]{0,1,2,3}, 1, 2); - out.writeUTF("mom washes rum"); - - HadoopDataInStream in = new HadoopDataInStream(mem); - - in.buffer().set(ptr, out.buffer().pointer()); - - assertEquals(false, in.readBoolean()); - assertEquals(true, in.readBoolean()); - assertEquals(false, in.readBoolean()); - assertEquals(17, in.read()); - assertEquals(121, in.read()); - assertEquals(0xfa, in.read()); - assertEquals(17, in.readByte()); - assertEquals(121, in.readByte()); - assertEquals((byte)0xfa, in.readByte()); - assertEquals('z', in.readChar()); - assertEquals('o', in.readChar()); - assertEquals('r', in.readChar()); - assertEquals(100, in.readShort()); - assertEquals(Short.MIN_VALUE, in.readShort()); - assertEquals(Short.MAX_VALUE, in.readShort()); - assertEquals(-1, in.readShort()); - assertEquals(0, in.readShort()); - assertEquals(Integer.MAX_VALUE, in.readInt()); - assertEquals(Integer.MIN_VALUE, in.readInt()); - assertEquals(-1, in.readInt()); - assertEquals(0, in.readInt()); - assertEquals(1, in.readInt()); - assertEquals(0.33f, in.readFloat()); - assertEquals(0.5f, in.readFloat()); - assertEquals(-0.7f, in.readFloat()); - assertEquals(Float.MAX_VALUE, in.readFloat()); - assertEquals(Float.MIN_VALUE, in.readFloat()); - assertEquals(Float.MIN_NORMAL, in.readFloat()); - assertEquals(Float.POSITIVE_INFINITY, in.readFloat()); - assertEquals(Float.NEGATIVE_INFINITY, in.readFloat()); - assertEquals(Float.NaN, in.readFloat()); - assertEquals(-12312312.3333333336666779, in.readDouble()); - assertEquals(123123.234, in.readDouble()); - assertEquals(Double.MAX_VALUE, in.readDouble()); - assertEquals(Double.MIN_VALUE, in.readDouble()); - assertEquals(Double.MIN_NORMAL, in.readDouble()); - assertEquals(Double.NEGATIVE_INFINITY, in.readDouble()); - assertEquals(Double.POSITIVE_INFINITY, in.readDouble()); - assertEquals(Double.NaN, in.readDouble()); - assertEquals(Long.MAX_VALUE, in.readLong()); - assertEquals(Long.MIN_VALUE, in.readLong()); - assertEquals(0, in.readLong()); - assertEquals(-1, in.readLong()); - - byte[] b = new byte[3]; - - in.read(b); - - assertTrue(Arrays.equals(new byte[]{1,2,3}, b)); - - b = new byte[4]; - - in.read(b, 1, 2); - - assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b)); - - assertEquals("mom washes rum", in.readUTF()); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c09b38c/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java deleted file mode 100644 index 2385668..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.util.UUID; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.igfs.IgfsOutputStream; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; - -import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo; - -/** - * Job tracker self test. - */ -public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-404"); - - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override public HadoopConfiguration hadoopConfiguration(String gridName) { - HadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - // TODO: IGNITE-404: Uncomment when fixed. - //cfg.setExternalExecution(true); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setMarshaller(new JdkMarshaller()); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testSimpleTaskSubmit() throws Exception { - String testInputFile = "/test"; - - prepareTestFile(testInputFile); - - Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - Job job = Job.getInstance(cfg); - - job.setMapperClass(TestMapper.class); - job.setCombinerClass(TestReducer.class); - job.setReducerClass(TestReducer.class); - - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(IntWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setNumReduceTasks(1); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); - - job.setJarByClass(getClass()); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); - - fut.get(); - } - - /** - * @throws Exception If failed. - */ - public void testMapperException() throws Exception { - String testInputFile = "/test"; - - prepareTestFile(testInputFile); - - Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - Job job = Job.getInstance(cfg); - - job.setMapperClass(TestFailingMapper.class); - job.setCombinerClass(TestReducer.class); - job.setReducerClass(TestReducer.class); - - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(IntWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setNumReduceTasks(1); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); - - job.setJarByClass(getClass()); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); - - try { - fut.get(); - } - catch (IgniteCheckedException e) { - IOException exp = X.cause(e, IOException.class); - - assertNotNull(exp); - assertEquals("Test failure", exp.getMessage()); - } - } - - /** - * @param filePath File path to prepare. - * @throws Exception If failed. - */ - private void prepareTestFile(String filePath) throws Exception { - IgniteFileSystem igfs = grid(0).fileSystem(igfsName); - - try (IgfsOutputStream out = igfs.create(new IgfsPath(filePath), true)) { - PrintWriter wr = new PrintWriter(new OutputStreamWriter(out)); - - for (int i = 0; i < 1000; i++) - wr.println("Hello, world: " + i); - - wr.flush(); - } - } - - /** - * - */ - private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { - /** One constant. */ - private IntWritable one = new IntWritable(1); - - /** Line constant. */ - private Text line = new Text("line"); - - @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - ctx.write(line, one); - } - } - - /** - * Failing mapper. - */ - private static class TestFailingMapper extends Mapper<Object, Text, Text, IntWritable> { - @Override protected void map(Object key, Text val, Context c) throws IOException, InterruptedException { - throw new IOException("Test failure"); - } - } - - /** - * - */ - private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { - /** Line constant. */ - private Text line = new Text("line"); - - @Override protected void setup(Context ctx) throws IOException, InterruptedException { - super.setup(ctx); - } - - /** {@inheritDoc} */ - @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) - throws IOException, InterruptedException { - int s = 0; - - for (IntWritable val : values) - s += val.get(); - - System.out.println(">>>> Reduced: " + s); - - ctx.write(line, new IntWritable(s)); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4c09b38c/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java deleted file mode 100644 index 851c3af..0000000 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Collection; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * Tests Hadoop external communication component. - */ -public class HadoopExternalCommunicationSelfTest extends GridCommonAbstractTest { - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-404"); - } - - /** - * @throws Exception If failed. - */ - public void testSimpleMessageSendingTcp() throws Exception { - checkSimpleMessageSending(false); - } - - /** - * @throws Exception If failed. - */ - public void testSimpleMessageSendingShmem() throws Exception { - checkSimpleMessageSending(true); - } - - /** - * @throws Exception If failed. - */ - private void checkSimpleMessageSending(boolean useShmem) throws Exception { - UUID parentNodeId = UUID.randomUUID(); - - Marshaller marsh = new JdkMarshaller(); - - IgniteLogger log = log(); - - HadoopExternalCommunication[] comms = new HadoopExternalCommunication[4]; - - try { - String name = "grid"; - - TestHadoopListener[] lsnrs = new TestHadoopListener[4]; - - int msgs = 10; - - for (int i = 0; i < comms.length; i++) { - comms[i] = new HadoopExternalCommunication(parentNodeId, UUID.randomUUID(), marsh, log, - Executors.newFixedThreadPool(1), name + i); - - if (useShmem) - comms[i].setSharedMemoryPort(14000); - - lsnrs[i] = new TestHadoopListener(msgs); - - comms[i].setListener(lsnrs[i]); - - comms[i].start(); - } - - for (int r = 0; r < msgs; r++) { - for (int from = 0; from < comms.length; from++) { - for (int to = 0; to < comms.length; to++) { - if (from == to) - continue; - - comms[from].sendMessage(comms[to].localProcessDescriptor(), new TestMessage(from, to)); - } - } - } - - U.sleep(1000); - - for (TestHadoopListener lsnr : lsnrs) { - lsnr.await(3_000); - - assertEquals(String.valueOf(lsnr.messages()), msgs * (comms.length - 1), lsnr.messages().size()); - } - } - finally { - for (HadoopExternalCommunication comm : comms) { - if (comm != null) - comm.stop(); - } - } - } - - /** - * - */ - private static class TestHadoopListener implements HadoopMessageListener { - /** Received messages (array list is safe because executor has one thread). */ - private Collection<TestMessage> msgs = new ArrayList<>(); - - /** Await latch. */ - private CountDownLatch receiveLatch; - - /** - * @param msgs Number of messages to await. - */ - private TestHadoopListener(int msgs) { - receiveLatch = new CountDownLatch(msgs); - } - - /** {@inheritDoc} */ - @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { - assert msg instanceof TestMessage; - - msgs.add((TestMessage)msg); - - receiveLatch.countDown(); - } - - /** {@inheritDoc} */ - @Override public void onConnectionLost(HadoopProcessDescriptor desc) { - // No-op. - } - - /** - * @return Received messages. - */ - public Collection<TestMessage> messages() { - return msgs; - } - - /** - * @param millis Time to await. - * @throws InterruptedException If wait interrupted. - */ - public void await(int millis) throws InterruptedException { - receiveLatch.await(millis, TimeUnit.MILLISECONDS); - } - } - - /** - * - */ - private static class TestMessage implements HadoopMessage { - /** From index. */ - private int from; - - /** To index. */ - private int to; - - /** - * @param from From index. - * @param to To index. - */ - private TestMessage(int from, int to) { - this.from = from; - this.to = to; - } - - /** - * Required by {@link Externalizable}. - */ - public TestMessage() { - // No-op. - } - - /** - * @return From index. - */ - public int from() { - return from; - } - - /** - * @return To index. - */ - public int to() { - return to; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(from); - out.writeInt(to); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - from = in.readInt(); - to = in.readInt(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4c09b38c/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 773bec5..944e285 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -72,10 +72,10 @@ import org.apache.ignite.internal.processors.hadoop.impl.HadoopV2JobSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopValidationSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopWeightedMapReducePlannerTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopWeightedPlannerMapReduceTest; -import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimapSelftest; -import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMapSelfTest; -import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipListSelfTest; -import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataStreamSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopConcurrentHashMultimapSelftest; +import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopHashMapSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopSkipListSelfTest; +import org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams.HadoopDataStreamSelfTest; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U;