http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java new file mode 100644 index 0000000..f70ef2f --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.util.GridRandom; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.io.GridDataInput; +import org.apache.ignite.internal.util.io.GridUnsafeDataInput; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.apache.ignite.internal.util.typedef.X; + +import static java.lang.Math.abs; +import static java.lang.Math.ceil; +import static java.lang.Math.max; + +/** + * Skip list tests. + */ +public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { + /** + * + */ + public void testLevel() { + Random rnd = new GridRandom(); + + int[] levelsCnts = new int[32]; + + int all = 10000; + + for (int i = 0; i < all; i++) { + int level = HadoopSkipList.randomLevel(rnd); + + levelsCnts[level]++; + } + + X.println("Distribution: " + Arrays.toString(levelsCnts)); + + for (int level = 0; level < levelsCnts.length; level++) { + int exp = (level + 1) == levelsCnts.length ? 0 : all >>> (level + 1); + + double precission = 0.72 / Math.max(32 >>> level, 1); + + int sigma = max((int)ceil(precission * exp), 5); + + X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precision: " + precission + + " sigma: " + sigma); + + assertTrue(abs(exp - levelsCnts[level]) <= sigma); // Sometimes fails. + } + } + + public void testMapSimple() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + +// mem.listen(new GridOffHeapEventListener() { +// @Override public void onEvent(GridOffHeapEvent evt) { +// if (evt == GridOffHeapEvent.ALLOCATE) +// U.dumpStack(); +// } +// }); + + Random rnd = new Random(); + + int mapSize = 16 << rnd.nextInt(6); + + HadoopJobInfo job = new JobInfo(); + + HadoopTaskContext taskCtx = new TaskContext(); + + HadoopMultimap m = new HadoopSkipList(job, mem); + + HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); + + Multimap<Integer, Integer> mm = ArrayListMultimap.create(); + Multimap<Integer, Integer> vis = ArrayListMultimap.create(); + + for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { + int key = rnd.nextInt(mapSize); + int val = rnd.nextInt(); + + a.write(new IntWritable(key), new IntWritable(val)); + mm.put(key, val); + + X.println("k: " + key + " v: " + val); + + a.close(); + + check(m, mm, vis, taskCtx); + + a = m.startAdding(taskCtx); + } + +// a.add(new IntWritable(10), new IntWritable(2)); +// mm.put(10, 2); +// check(m, mm); + + a.close(); + + X.println("Alloc: " + mem.allocatedSize()); + + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + + private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) + throws Exception { + final HadoopTaskInput in = m.input(taskCtx); + + Map<Integer, Collection<Integer>> mmm = mm.asMap(); + + int keys = 0; + + int prevKey = Integer.MIN_VALUE; + + while (in.next()) { + keys++; + + IntWritable k = (IntWritable)in.key(); + + assertNotNull(k); + + assertTrue(k.get() > prevKey); + + prevKey = k.get(); + + Deque<Integer> vs = new LinkedList<>(); + + Iterator<?> it = in.values(); + + while (it.hasNext()) + vs.addFirst(((IntWritable) it.next()).get()); + + Collection<Integer> exp = mmm.get(k.get()); + + assertEquals(exp, vs); + } + + assertEquals(mmm.size(), keys); + +//! assertEquals(m.keys(), keys); + + // Check visitor. + + final byte[] buf = new byte[4]; + + final GridDataInput dataInput = new GridUnsafeDataInput(); + + m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { + /** */ + IntWritable key = new IntWritable(); + + /** */ + IntWritable val = new IntWritable(); + + @Override public void onKey(long keyPtr, int keySize) { + read(keyPtr, keySize, key); + } + + @Override public void onValue(long valPtr, int valSize) { + read(valPtr, valSize, val); + + vis.put(key.get(), val.get()); + } + + private void read(long ptr, int size, Writable w) { + assert size == 4 : size; + + GridUnsafe.copyMemory(null, ptr, buf, GridUnsafe.BYTE_ARR_OFF, size); + + dataInput.bytes(buf, size); + + try { + w.readFields(dataInput); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + +// X.println("vis: " + vis); + + assertEquals(mm, vis); + + in.close(); + } + + /** + * @throws Exception if failed. + */ + public void testMultiThreaded() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + + X.println("___ Started"); + + Random rnd = new GridRandom(); + + for (int i = 0; i < 20; i++) { + HadoopJobInfo job = new JobInfo(); + + final HadoopTaskContext taskCtx = new TaskContext(); + + final HadoopMultimap m = new HadoopSkipList(job, mem); + + final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); + + X.println("___ MT"); + + multithreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + X.println("___ TH in"); + + Random rnd = new GridRandom(); + + IntWritable key = new IntWritable(); + IntWritable val = new IntWritable(); + + HadoopMultimap.Adder a = m.startAdding(taskCtx); + + for (int i = 0; i < 50000; i++) { + int k = rnd.nextInt(32000); + int v = rnd.nextInt(); + + key.set(k); + val.set(v); + + a.write(key, val); + + Collection<Integer> list = mm.get(k); + + if (list == null) { + list = new ConcurrentLinkedQueue<>(); + + Collection<Integer> old = mm.putIfAbsent(k, list); + + if (old != null) + list = old; + } + + list.add(v); + } + + a.close(); + + X.println("___ TH out"); + + return null; + } + }, 3 + rnd.nextInt(27)); + + HadoopTaskInput in = m.input(taskCtx); + + int prevKey = Integer.MIN_VALUE; + + while (in.next()) { + IntWritable key = (IntWritable)in.key(); + + assertTrue(key.get() > prevKey); + + prevKey = key.get(); + + Iterator<?> valsIter = in.values(); + + Collection<Integer> vals = mm.remove(key.get()); + + assertNotNull(vals); + + while (valsIter.hasNext()) { + IntWritable val = (IntWritable) valsIter.next(); + + assertTrue(vals.remove(val.get())); + } + + assertTrue(vals.isEmpty()); + } + + in.close(); + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java new file mode 100644 index 0000000..dd571af --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class HadoopDataStreamSelfTest extends GridCommonAbstractTest { + + public void testStreams() throws IOException { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + + HadoopDataOutStream out = new HadoopDataOutStream(mem); + + int size = 4 * 1024; + + final long ptr = mem.allocate(size); + + out.buffer().set(ptr, size); + + out.writeBoolean(false); + out.writeBoolean(true); + out.writeBoolean(false); + out.write(17); + out.write(121); + out.write(0xfafa); + out.writeByte(17); + out.writeByte(121); + out.writeByte(0xfafa); + out.writeChar('z'); + out.writeChar('o'); + out.writeChar('r'); + out.writeShort(100); + out.writeShort(Short.MIN_VALUE); + out.writeShort(Short.MAX_VALUE); + out.writeShort(65535); + out.writeShort(65536); // 0 + out.writeInt(Integer.MAX_VALUE); + out.writeInt(Integer.MIN_VALUE); + out.writeInt(-1); + out.writeInt(0); + out.writeInt(1); + out.writeFloat(0.33f); + out.writeFloat(0.5f); + out.writeFloat(-0.7f); + out.writeFloat(Float.MAX_VALUE); + out.writeFloat(Float.MIN_VALUE); + out.writeFloat(Float.MIN_NORMAL); + out.writeFloat(Float.POSITIVE_INFINITY); + out.writeFloat(Float.NEGATIVE_INFINITY); + out.writeFloat(Float.NaN); + out.writeDouble(-12312312.3333333336666779); + out.writeDouble(123123.234); + out.writeDouble(Double.MAX_VALUE); + out.writeDouble(Double.MIN_VALUE); + out.writeDouble(Double.MIN_NORMAL); + out.writeDouble(Double.NEGATIVE_INFINITY); + out.writeDouble(Double.POSITIVE_INFINITY); + out.writeDouble(Double.NaN); + out.writeLong(Long.MAX_VALUE); + out.writeLong(Long.MIN_VALUE); + out.writeLong(0); + out.writeLong(-1L); + out.write(new byte[]{1,2,3}); + out.write(new byte[]{0,1,2,3}, 1, 2); + out.writeUTF("mom washes rum"); + + HadoopDataInStream in = new HadoopDataInStream(mem); + + in.buffer().set(ptr, out.buffer().pointer()); + + assertEquals(false, in.readBoolean()); + assertEquals(true, in.readBoolean()); + assertEquals(false, in.readBoolean()); + assertEquals(17, in.read()); + assertEquals(121, in.read()); + assertEquals(0xfa, in.read()); + assertEquals(17, in.readByte()); + assertEquals(121, in.readByte()); + assertEquals((byte)0xfa, in.readByte()); + assertEquals('z', in.readChar()); + assertEquals('o', in.readChar()); + assertEquals('r', in.readChar()); + assertEquals(100, in.readShort()); + assertEquals(Short.MIN_VALUE, in.readShort()); + assertEquals(Short.MAX_VALUE, in.readShort()); + assertEquals(-1, in.readShort()); + assertEquals(0, in.readShort()); + assertEquals(Integer.MAX_VALUE, in.readInt()); + assertEquals(Integer.MIN_VALUE, in.readInt()); + assertEquals(-1, in.readInt()); + assertEquals(0, in.readInt()); + assertEquals(1, in.readInt()); + assertEquals(0.33f, in.readFloat()); + assertEquals(0.5f, in.readFloat()); + assertEquals(-0.7f, in.readFloat()); + assertEquals(Float.MAX_VALUE, in.readFloat()); + assertEquals(Float.MIN_VALUE, in.readFloat()); + assertEquals(Float.MIN_NORMAL, in.readFloat()); + assertEquals(Float.POSITIVE_INFINITY, in.readFloat()); + assertEquals(Float.NEGATIVE_INFINITY, in.readFloat()); + assertEquals(Float.NaN, in.readFloat()); + assertEquals(-12312312.3333333336666779, in.readDouble()); + assertEquals(123123.234, in.readDouble()); + assertEquals(Double.MAX_VALUE, in.readDouble()); + assertEquals(Double.MIN_VALUE, in.readDouble()); + assertEquals(Double.MIN_NORMAL, in.readDouble()); + assertEquals(Double.NEGATIVE_INFINITY, in.readDouble()); + assertEquals(Double.POSITIVE_INFINITY, in.readDouble()); + assertEquals(Double.NaN, in.readDouble()); + assertEquals(Long.MAX_VALUE, in.readLong()); + assertEquals(Long.MIN_VALUE, in.readLong()); + assertEquals(0, in.readLong()); + assertEquals(-1, in.readLong()); + + byte[] b = new byte[3]; + + in.read(b); + + assertTrue(Arrays.equals(new byte[]{1,2,3}, b)); + + b = new byte[4]; + + in.read(b, 1, 2); + + assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b)); + + assertEquals("mom washes rum", in.readUTF()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java new file mode 100644 index 0000000..7dd045a --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.LongAdder8; + +/** + * + */ +public class HadoopExecutorServiceTest extends GridCommonAbstractTest { + /** + * @throws Exception If failed. + */ + public void testExecutesAll() throws Exception { + final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5); + + for (int i = 0; i < 5; i++) { + final int loops = 5000; + int threads = 17; + + final LongAdder8 sum = new LongAdder8(); + + multithreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < loops; i++) { + exec.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + sum.increment(); + + return null; + } + }); + } + + return null; + } + }, threads); + + while (exec.active() != 0) { + X.println("__ active: " + exec.active()); + + Thread.sleep(200); + } + + assertEquals(threads * loops, sum.sum()); + + X.println("_ ok"); + } + + assertTrue(exec.shutdown(0)); + } + + /** + * @throws Exception If failed. + */ + public void testShutdown() throws Exception { + for (int i = 0; i < 5; i++) { + final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5); + + final LongAdder8 sum = new LongAdder8(); + + final AtomicBoolean finish = new AtomicBoolean(); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finish.get()) { + exec.submit(new Callable<Void>() { + @Override public Void call() throws Exception { + sum.increment(); + + return null; + } + }); + } + + return null; + } + }, 19); + + Thread.sleep(200); + + assertTrue(exec.shutdown(50)); + + long res = sum.sum(); + + assertTrue(res > 0); + + finish.set(true); + + fut.get(); + + assertEquals(res, sum.sum()); // Nothing was executed after shutdown. + + X.println("_ ok"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java new file mode 100644 index 0000000..ec33836 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.igfs.IgfsOutputStream; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; + +/** + * Job tracker self test. + */ +public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-404"); + + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + // TODO: IGNITE-404: Uncomment when fixed. + //cfg.setExternalExecution(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new JdkMarshaller()); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSimpleTaskSubmit() throws Exception { + String testInputFile = "/test"; + + prepareTestFile(testInputFile); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + Job job = Job.getInstance(cfg); + + job.setMapperClass(TestMapper.class); + job.setCombinerClass(TestReducer.class); + job.setReducerClass(TestReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); + + job.setJarByClass(getClass()); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + fut.get(); + } + + /** + * @throws Exception If failed. + */ + public void testMapperException() throws Exception { + String testInputFile = "/test"; + + prepareTestFile(testInputFile); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + Job job = Job.getInstance(cfg); + + job.setMapperClass(TestFailingMapper.class); + job.setCombinerClass(TestReducer.class); + job.setReducerClass(TestReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); + + job.setJarByClass(getClass()); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + try { + fut.get(); + } + catch (IgniteCheckedException e) { + IOException exp = X.cause(e, IOException.class); + + assertNotNull(exp); + assertEquals("Test failure", exp.getMessage()); + } + } + + /** + * @param filePath File path to prepare. + * @throws Exception If failed. + */ + private void prepareTestFile(String filePath) throws Exception { + IgniteFileSystem igfs = grid(0).fileSystem(igfsName); + + try (IgfsOutputStream out = igfs.create(new IgfsPath(filePath), true)) { + PrintWriter wr = new PrintWriter(new OutputStreamWriter(out)); + + for (int i = 0; i < 1000; i++) + wr.println("Hello, world: " + i); + + wr.flush(); + } + } + + /** + * + */ + private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { + /** One constant. */ + private IntWritable one = new IntWritable(1); + + /** Line constant. */ + private Text line = new Text("line"); + + @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + ctx.write(line, one); + } + } + + /** + * Failing mapper. + */ + private static class TestFailingMapper extends Mapper<Object, Text, Text, IntWritable> { + @Override protected void map(Object key, Text val, Context c) throws IOException, InterruptedException { + throw new IOException("Test failure"); + } + } + + /** + * + */ + private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { + /** Line constant. */ + private Text line = new Text("line"); + + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + super.setup(ctx); + } + + /** {@inheritDoc} */ + @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) + throws IOException, InterruptedException { + int s = 0; + + for (IntWritable val : values) + s += val.get(); + + System.out.println(">>>> Reduced: " + s); + + ctx.write(line, new IntWritable(s)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java new file mode 100644 index 0000000..851c3af --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests Hadoop external communication component. + */ +public class HadoopExternalCommunicationSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-404"); + } + + /** + * @throws Exception If failed. + */ + public void testSimpleMessageSendingTcp() throws Exception { + checkSimpleMessageSending(false); + } + + /** + * @throws Exception If failed. + */ + public void testSimpleMessageSendingShmem() throws Exception { + checkSimpleMessageSending(true); + } + + /** + * @throws Exception If failed. + */ + private void checkSimpleMessageSending(boolean useShmem) throws Exception { + UUID parentNodeId = UUID.randomUUID(); + + Marshaller marsh = new JdkMarshaller(); + + IgniteLogger log = log(); + + HadoopExternalCommunication[] comms = new HadoopExternalCommunication[4]; + + try { + String name = "grid"; + + TestHadoopListener[] lsnrs = new TestHadoopListener[4]; + + int msgs = 10; + + for (int i = 0; i < comms.length; i++) { + comms[i] = new HadoopExternalCommunication(parentNodeId, UUID.randomUUID(), marsh, log, + Executors.newFixedThreadPool(1), name + i); + + if (useShmem) + comms[i].setSharedMemoryPort(14000); + + lsnrs[i] = new TestHadoopListener(msgs); + + comms[i].setListener(lsnrs[i]); + + comms[i].start(); + } + + for (int r = 0; r < msgs; r++) { + for (int from = 0; from < comms.length; from++) { + for (int to = 0; to < comms.length; to++) { + if (from == to) + continue; + + comms[from].sendMessage(comms[to].localProcessDescriptor(), new TestMessage(from, to)); + } + } + } + + U.sleep(1000); + + for (TestHadoopListener lsnr : lsnrs) { + lsnr.await(3_000); + + assertEquals(String.valueOf(lsnr.messages()), msgs * (comms.length - 1), lsnr.messages().size()); + } + } + finally { + for (HadoopExternalCommunication comm : comms) { + if (comm != null) + comm.stop(); + } + } + } + + /** + * + */ + private static class TestHadoopListener implements HadoopMessageListener { + /** Received messages (array list is safe because executor has one thread). */ + private Collection<TestMessage> msgs = new ArrayList<>(); + + /** Await latch. */ + private CountDownLatch receiveLatch; + + /** + * @param msgs Number of messages to await. + */ + private TestHadoopListener(int msgs) { + receiveLatch = new CountDownLatch(msgs); + } + + /** {@inheritDoc} */ + @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { + assert msg instanceof TestMessage; + + msgs.add((TestMessage)msg); + + receiveLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void onConnectionLost(HadoopProcessDescriptor desc) { + // No-op. + } + + /** + * @return Received messages. + */ + public Collection<TestMessage> messages() { + return msgs; + } + + /** + * @param millis Time to await. + * @throws InterruptedException If wait interrupted. + */ + public void await(int millis) throws InterruptedException { + receiveLatch.await(millis, TimeUnit.MILLISECONDS); + } + } + + /** + * + */ + private static class TestMessage implements HadoopMessage { + /** From index. */ + private int from; + + /** To index. */ + private int to; + + /** + * @param from From index. + * @param to To index. + */ + private TestMessage(int from, int to) { + this.from = from; + this.to = to; + } + + /** + * Required by {@link Externalizable}. + */ + public TestMessage() { + // No-op. + } + + /** + * @return From index. + */ + public int from() { + return from; + } + + /** + * @return To index. + */ + public int to() { + return to; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(from); + out.writeInt(to); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + from = in.readInt(); + to = in.readInt(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java new file mode 100644 index 0000000..603fd5b --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.client.hadoop.HadoopClientProtocolEmbeddedSelfTest; +import org.apache.ignite.client.hadoop.HadoopClientProtocolSelfTest; +import org.apache.ignite.hadoop.cache.HadoopTxConfigCacheTest; +import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactorySelfTest; +import org.apache.ignite.hadoop.util.BasicUserNameMapperSelfTest; +import org.apache.ignite.hadoop.util.ChainedUserNameMapperSelfTest; +import org.apache.ignite.hadoop.util.KerberosUserNameMapperSelfTest; +import org.apache.ignite.igfs.Hadoop1OverIgfsDualAsyncTest; +import org.apache.ignite.igfs.Hadoop1OverIgfsDualSyncTest; +import org.apache.ignite.igfs.HadoopFIleSystemFactorySelfTest; +import org.apache.ignite.igfs.HadoopIgfs20FileSystemLoopbackPrimarySelfTest; +import org.apache.ignite.igfs.HadoopIgfsDualAsyncSelfTest; +import org.apache.ignite.igfs.HadoopIgfsDualSyncSelfTest; +import org.apache.ignite.igfs.HadoopSecondaryFileSystemConfigurationTest; +import org.apache.ignite.igfs.IgfsEventsTestSuite; +import org.apache.ignite.igfs.IgniteHadoopFileSystemClientSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemHandshakeSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemLoggerSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemLoggerStateSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackEmbeddedPrimarySelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackEmbeddedSecondarySelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalDualAsyncSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopClassLoaderTest; +import org.apache.ignite.internal.processors.hadoop.HadoopCommandLineTest; +import org.apache.ignite.internal.processors.hadoop.HadoopDefaultMapReducePlannerSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopFileSystemsTest; +import org.apache.ignite.internal.processors.hadoop.HadoopGroupingTest; +import org.apache.ignite.internal.processors.hadoop.HadoopJobTrackerSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceEmbeddedSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceErrorResilienceTest; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceTest; +import org.apache.ignite.internal.processors.hadoop.HadoopNoHadoopMapReduceTest; +import org.apache.ignite.internal.processors.hadoop.HadoopSerializationWrapperSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopSnappyFullMapReduceTest; +import org.apache.ignite.internal.processors.hadoop.HadoopSnappyTest; +import org.apache.ignite.internal.processors.hadoop.HadoopSortingTest; +import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapperSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopTasksV1Test; +import org.apache.ignite.internal.processors.hadoop.HadoopTasksV2Test; +import org.apache.ignite.internal.processors.hadoop.HadoopUserLibsSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopV2JobSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopValidationSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopWeightedMapReducePlannerTest; +import org.apache.ignite.internal.processors.hadoop.HadoopWeightedPlannerMapReduceTest; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimapSelftest; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMapSelfTest; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipListSelfTest; +import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataStreamSelfTest; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +import static org.apache.ignite.testframework.GridTestUtils.modeToPermissionSet; + +/** + * Test suite for Hadoop Map Reduce engine. + */ +public class IgniteHadoopTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + downloadHadoop(); + downloadHive(); + + final ClassLoader ldr = TestSuite.class.getClassLoader(); + + TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite"); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopUserLibsSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopDefaultMapReducePlannerSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopWeightedMapReducePlannerTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(BasicUserNameMapperSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(KerberosUserNameMapperSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(ChainedUserNameMapperSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(KerberosHadoopFileSystemFactorySelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSnappyFullMapReduceTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopClassLoaderTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfs20FileSystemLoopbackPrimarySelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfsDualSyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfsDualAsyncSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualSyncTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualAsyncTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopFIleSystemFactorySelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalDualAsyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackEmbeddedPrimarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackEmbeddedSecondarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemClientSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoggerStateSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoggerSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemHandshakeSelfTest.class.getName()))); + + suite.addTest(IgfsEventsTestSuite.suiteNoarchOnly()); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopFileSystemsTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopValidationSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopJobTrackerSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopHashMapSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopDataStreamSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopConcurrentHashMultimapSelftest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopSkipListSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopTaskExecutionSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopV2JobSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopSerializationWrapperSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSplitWrapperSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopTasksV1Test.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopTasksV2Test.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopWeightedPlannerMapReduceTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopNoHadoopMapReduceTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceErrorResilienceTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceEmbeddedSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopSortingTest.class.getName()))); + +// suite.addTest(new TestSuite(ldr.loadClass(HadoopExternalTaskExecutionSelfTest.class.getName()))); +// suite.addTest(new TestSuite(ldr.loadClass(HadoopExternalCommunicationSelfTest.class.getName()))); +// suite.addTest(new TestSuite(ldr.loadClass(HadoopSortingExternalTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopGroupingTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopClientProtocolSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopClientProtocolEmbeddedSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopCommandLineTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopSecondaryFileSystemConfigurationTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopTxConfigCacheTest.class.getName()))); + + return suite; + } + + /** + * @throws Exception If failed. + */ + public static void downloadHive() throws Exception { + String ver = IgniteSystemProperties.getString("hive.version", "1.2.1"); + + X.println("Will use Hive version: " + ver); + + String downloadPath = "hive/hive-" + ver + "/apache-hive-" + ver + "-bin.tar.gz"; + + download("Hive", "HIVE_HOME", downloadPath, "apache-hive-" + ver + "-bin"); + } + + /** + * @throws Exception If failed. + */ + public static void downloadHadoop() throws Exception { + String ver = IgniteSystemProperties.getString("hadoop.version", "2.4.1"); + + X.println("Will use Hadoop version: " + ver); + + String downloadPath = "hadoop/core/hadoop-" + ver + "/hadoop-" + ver + ".tar.gz"; + + download("Hadoop", "HADOOP_HOME", downloadPath, "hadoop-" + ver); + } + + /** + * Downloads and extracts an Apache product. + * + * @param appName Name of application for log messages. + * @param homeVariable Pointer to home directory of the component. + * @param downloadPath Relative download path of tar package. + * @param destName Local directory name to install component. + * @throws Exception If failed. + */ + private static void download(String appName, String homeVariable, String downloadPath, String destName) + throws Exception { + String homeVal = IgniteSystemProperties.getString(homeVariable); + + if (!F.isEmpty(homeVal) && new File(homeVal).isDirectory()) { + X.println(homeVariable + " is set to: " + homeVal); + + return; + } + + List<String> urls = F.asList( + "http://archive.apache.org/dist/", + "http://apache-mirror.rbc.ru/pub/apache/", + "http://www.eu.apache.org/dist/", + "http://www.us.apache.org/dist/"); + + String tmpPath = System.getProperty("java.io.tmpdir"); + + X.println("tmp: " + tmpPath); + + final File install = new File(tmpPath + File.separatorChar + "__hadoop"); + + final File home = new File(install, destName); + + X.println("Setting " + homeVariable + " to " + home.getAbsolutePath()); + + System.setProperty(homeVariable, home.getAbsolutePath()); + + final File successFile = new File(home, "__success"); + + if (home.exists()) { + if (successFile.exists()) { + X.println(appName + " distribution already exists."); + + return; + } + + X.println(appName + " distribution is invalid and it will be deleted."); + + if (!U.delete(home)) + throw new IOException("Failed to delete directory: " + home.getAbsolutePath()); + } + + for (String url : urls) { + if (!(install.exists() || install.mkdirs())) + throw new IOException("Failed to create directory: " + install.getAbsolutePath()); + + URL u = new URL(url + downloadPath); + + X.println("Attempting to download from: " + u); + + try { + URLConnection c = u.openConnection(); + + c.connect(); + + try (TarArchiveInputStream in = new TarArchiveInputStream(new GzipCompressorInputStream( + new BufferedInputStream(c.getInputStream(), 32 * 1024)))) { + + TarArchiveEntry entry; + + while ((entry = in.getNextTarEntry()) != null) { + File dest = new File(install, entry.getName()); + + if (entry.isDirectory()) { + if (!dest.mkdirs()) + throw new IllegalStateException(); + } + else if (entry.isSymbolicLink()) { + // Important: in Hadoop installation there are symlinks, we need to create them: + Path theLinkItself = Paths.get(install.getAbsolutePath(), entry.getName()); + + Path linkTarget = Paths.get(entry.getLinkName()); + + Files.createSymbolicLink(theLinkItself, linkTarget); + } + else { + File parent = dest.getParentFile(); + + if (!(parent.exists() || parent.mkdirs())) + throw new IllegalStateException(); + + X.print(" [" + dest); + + try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(dest, false), + 128 * 1024)) { + U.copy(in, out); + + out.flush(); + } + + Files.setPosixFilePermissions(dest.toPath(), modeToPermissionSet(entry.getMode())); + + X.println("]"); + } + } + } + + if (successFile.createNewFile()) + return; + } + catch (Exception e) { + e.printStackTrace(); + + U.delete(home); + } + } + + throw new IllegalStateException("Failed to install " + appName + "."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java new file mode 100644 index 0000000..4ed1d65 --- /dev/null +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.igfs.HadoopIgfs20FileSystemShmemPrimarySelfTest; +import org.apache.ignite.igfs.IgfsEventsTestSuite; +import org.apache.ignite.igfs.IgniteHadoopFileSystemIpcCacheSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemExternalDualSyncSelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemExternalPrimarySelfTest; +import org.apache.ignite.igfs.IgniteHadoopFileSystemShmemExternalSecondarySelfTest; +import org.apache.ignite.internal.processors.igfs.IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest; + +import static org.apache.ignite.testsuites.IgniteHadoopTestSuite.downloadHadoop; + +/** + * Test suite for Hadoop file system over Ignite cache. + * Contains tests which works on Linux and Mac OS platform only. + */ +public class IgniteIgfsLinuxAndMacOSTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + downloadHadoop(); + + ClassLoader ldr = TestSuite.class.getClassLoader(); + + TestSuite suite = new TestSuite("Ignite IGFS Test Suite For Linux And Mac OS"); + + suite.addTest(new TestSuite(ldr.loadClass(IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemExternalPrimarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemExternalSecondarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemIpcCacheSelfTest.class.getName()))); + + suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfs20FileSystemShmemPrimarySelfTest.class.getName()))); + + suite.addTest(IgfsEventsTestSuite.suite()); + + return suite; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/modules/hadoop/pom.xml b/modules/hadoop/pom.xml index a3f40e5..70b8d03 100644 --- a/modules/hadoop/pom.xml +++ b/modules/hadoop/pom.xml @@ -54,42 +54,6 @@ </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-annotations</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java deleted file mode 100644 index a01bfaf..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.hadoop.util.KerberosUserNameMapper; -import org.apache.ignite.hadoop.util.UserNameMapper; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lifecycle.LifecycleAware; -import org.jetbrains.annotations.Nullable; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.Arrays; - -/** - * Simple Hadoop file system factory which delegates to {@code FileSystem.get()} on each call. - * <p> - * If {@code "fs.[prefix].impl.disable.cache"} is set to {@code true}, file system instances will be cached by Hadoop. - */ -public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable, LifecycleAware { - /** */ - private static final long serialVersionUID = 0L; - - /** File system URI. */ - private String uri; - - /** File system config paths. */ - private String[] cfgPaths; - - /** User name mapper. */ - private UserNameMapper usrNameMapper; - - /** Configuration of the secondary filesystem, never null. */ - protected transient Configuration cfg; - - /** Resulting URI. */ - protected transient URI fullUri; - - /** - * Constructor. - */ - public BasicHadoopFileSystemFactory() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public final FileSystem get(String name) throws IOException { - String name0 = IgfsUtils.fixUserName(name); - - if (usrNameMapper != null) - name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0)); - - return getWithMappedName(name0); - } - - /** - * Internal file system create routine. - * - * @param usrName User name. - * @return File system. - * @throws IOException If failed. - */ - protected FileSystem getWithMappedName(String usrName) throws IOException { - assert cfg != null; - - try { - // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation. - // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context - // classloader to classloader of current class to avoid strange class-cast-exceptions. - ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); - - try { - return create(usrName); - } - finally { - HadoopUtils.restoreContextClassLoader(oldLdr); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IOException("Failed to create file system due to interrupt.", e); - } - } - - /** - * Internal file system creation routine, invoked in correct class loader context. - * - * @param usrName User name. - * @return File system. - * @throws IOException If failed. - * @throws InterruptedException if the current thread is interrupted. - */ - protected FileSystem create(String usrName) throws IOException, InterruptedException { - return FileSystem.get(fullUri, cfg, usrName); - } - - /** - * Gets file system URI. - * <p> - * This URI will be used as a first argument when calling {@link FileSystem#get(URI, Configuration, String)}. - * <p> - * If not set, default URI will be picked from file system configuration using - * {@link FileSystem#getDefaultUri(Configuration)} method. - * - * @return File system URI. - */ - @Nullable public String getUri() { - return uri; - } - - /** - * Sets file system URI. See {@link #getUri()} for more information. - * - * @param uri File system URI. - */ - public void setUri(@Nullable String uri) { - this.uri = uri; - } - - /** - * Gets paths to additional file system configuration files (e.g. core-site.xml). - * <p> - * Path could be either absolute or relative to {@code IGNITE_HOME} environment variable. - * <p> - * All provided paths will be loaded in the order they provided and then applied to {@link Configuration}. It means - * that path order might be important in some cases. - * <p> - * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of - * {@link IgniteHadoopFileSystem} resides. Corresponding paths must exist on these machines as well. - * - * @return Paths to file system configuration files. - */ - @Nullable public String[] getConfigPaths() { - return cfgPaths; - } - - /** - * Set paths to additional file system configuration files (e.g. core-site.xml). See {@link #getConfigPaths()} for - * more information. - * - * @param cfgPaths Paths to file system configuration files. - */ - public void setConfigPaths(@Nullable String... cfgPaths) { - this.cfgPaths = cfgPaths; - } - - /** - * Get optional user name mapper. - * <p> - * When IGFS is invoked from Hadoop, user name is passed along the way to ensure that request will be performed - * with proper user context. User name is passed in a simple form and doesn't contain any extended information, - * such as host, domain or Kerberos realm. You may use name mapper to translate plain user name to full user - * name required by security engine of the underlying file system. - * <p> - * For example you may want to use {@link KerberosUserNameMapper} to user name from {@code "johndoe"} to - * {@code "john...@your.realm.com"}. - * - * @return User name mapper. - */ - @Nullable public UserNameMapper getUserNameMapper() { - return usrNameMapper; - } - - /** - * Set optional user name mapper. See {@link #getUserNameMapper()} for more information. - * - * @param usrNameMapper User name mapper. - */ - public void setUserNameMapper(@Nullable UserNameMapper usrNameMapper) { - this.usrNameMapper = usrNameMapper; - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - cfg = HadoopUtils.safeCreateConfiguration(); - - if (cfgPaths != null) { - for (String cfgPath : cfgPaths) { - if (cfgPath == null) - throw new NullPointerException("Configuration path cannot be null: " + Arrays.toString(cfgPaths)); - else { - URL url = U.resolveIgniteUrl(cfgPath); - - if (url == null) { - // If secConfPath is given, it should be resolvable: - throw new IgniteException("Failed to resolve secondary file system configuration path " + - "(ensure that it exists locally and you have read access to it): " + cfgPath); - } - - cfg.addResource(url); - } - } - } - - // If secondary fs URI is not given explicitly, try to get it from the configuration: - if (uri == null) - fullUri = FileSystem.getDefaultUri(cfg); - else { - try { - fullUri = new URI(uri); - } - catch (URISyntaxException use) { - throw new IgniteException("Failed to resolve secondary file system URI: " + uri); - } - } - - if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) - ((LifecycleAware)usrNameMapper).start(); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware) - ((LifecycleAware)usrNameMapper).stop(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, uri); - - if (cfgPaths != null) { - out.writeInt(cfgPaths.length); - - for (String cfgPath : cfgPaths) - U.writeString(out, cfgPath); - } - else - out.writeInt(-1); - - out.writeObject(usrNameMapper); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - uri = U.readString(in); - - int cfgPathsCnt = in.readInt(); - - if (cfgPathsCnt != -1) { - cfgPaths = new String[cfgPathsCnt]; - - for (int i = 0; i < cfgPathsCnt; i++) - cfgPaths[i] = U.readString(in); - } - - usrNameMapper = (UserNameMapper)in.readObject(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java deleted file mode 100644 index bcbb082..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; - -import java.io.IOException; -import java.net.URI; - -/** - * Caching Hadoop file system factory. Caches {@link FileSystem} instances on per-user basis. Doesn't rely on - * built-in Hadoop {@code FileSystem} caching mechanics. Separate {@code FileSystem} instance is created for each - * user instead. - * <p> - * This makes cache instance resistant to concurrent calls to {@link FileSystem#close()} in other parts of the user - * code. On the other hand, this might cause problems on some environments. E.g. if Kerberos is enabled, a call to - * {@link FileSystem#get(URI, Configuration, String)} will refresh Kerberos token. But this factory implementation - * calls this method only once per user what may lead to token expiration. In such cases it makes sense to either - * use {@link BasicHadoopFileSystemFactory} or implement your own factory. - */ -public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory { - /** */ - private static final long serialVersionUID = 0L; - - /** Per-user file system cache. */ - private final transient HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>( - new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() { - @Override public FileSystem createValue(String key) throws IOException { - return CachingHadoopFileSystemFactory.super.getWithMappedName(key); - } - } - ); - - /** - * Public non-arg constructor. - */ - public CachingHadoopFileSystemFactory() { - // noop - } - - /** {@inheritDoc} */ - @Override public FileSystem getWithMappedName(String name) throws IOException { - return cache.getOrCreate(name); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - super.start(); - - // Disable caching. - cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - super.stop(); - - try { - cache.close(); - } - catch (IgniteCheckedException ice) { - throw new IgniteException(ice); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java deleted file mode 100644 index 5ad08ab..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.igfs.IgfsMode; -import org.apache.ignite.lifecycle.LifecycleAware; - -import java.io.IOException; -import java.io.Serializable; - -/** - * Factory for Hadoop {@link FileSystem} used by {@link IgniteHadoopIgfsSecondaryFileSystem}. - * <p> - * {@link #get(String)} method will be used whenever a call to a target {@code FileSystem} is required. - * <p> - * It is implementation dependent whether to rely on built-in Hadoop file system cache, implement own caching facility - * or doesn't cache file systems at all. - * <p> - * Concrete factory may implement {@link LifecycleAware} interface. In this case start and stop callbacks will be - * performed by Ignite. You may want to implement some initialization or cleanup there. - * <p> - * Note that factory extends {@link Serializable} interface as it might be necessary to transfer factories over the - * wire to {@link IgniteHadoopFileSystem} if {@link IgfsMode#PROXY} is enabled for some file - * system paths. - */ -public interface HadoopFileSystemFactory extends Serializable { - /** - * Gets file system for the given user name. - * - * @param usrName User name - * @return File system. - * @throws IOException In case of error. - */ - public FileSystem get(String usrName) throws IOException; -} http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java deleted file mode 100644 index 8085826..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import java.io.IOException; -import java.io.PrintStream; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.T2; - -/** - * Statistic writer implementation that writes info into any Hadoop file system. - */ -public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter { - /** */ - public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance"; - - /** */ - public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory"; - - /** */ - private static final String USER_MACRO = "${USER}"; - - /** */ - private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; - - /** {@inheritDoc} */ - @Override public void write(HadoopJob job, HadoopCounters cntrs) - throws IgniteCheckedException { - - Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); - - final HadoopJobInfo jobInfo = job.info(); - - final HadoopJobId jobId = job.id(); - - for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) - hadoopCfg.set(e.getKey(), e.getValue()); - - String user = jobInfo.user(); - - user = IgfsUtils.fixUserName(user); - - String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY); - - if (dir == null) - dir = DEFAULT_COUNTER_WRITER_DIR; - - Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString()); - - HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); - - try { - hadoopCfg.set(MRJobConfig.USER_NAME, user); - - FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg); - - fs.mkdirs(jobStatPath); - - try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) { - for (T2<String, Long> evt : perfCntr.evts()) { - out.print(evt.get1()); - out.print(':'); - out.println(evt.get2().toString()); - } - - out.flush(); - } - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} \ No newline at end of file