http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java deleted file mode 100644 index edafecd..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; -import org.jetbrains.annotations.Nullable; - -/** - * Round-robin mr planner. - */ -public class HadoopTestRoundRobinMrPlanner implements HadoopMapReducePlanner { - /** {@inheritDoc} */ - @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, - @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { - if (top.isEmpty()) - throw new IllegalArgumentException("Topology is empty"); - - // Has at least one element. - Iterator<ClusterNode> it = top.iterator(); - - Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); - - for (HadoopInputSplit block : job.input()) { - ClusterNode node = it.next(); - - Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id()); - - if (nodeBlocks == null) { - nodeBlocks = new ArrayList<>(); - - mappers.put(node.id(), nodeBlocks); - } - - nodeBlocks.add(block); - - if (!it.hasNext()) - it = top.iterator(); - } - - int[] rdc = new int[job.info().reducers()]; - - for (int i = 0; i < rdc.length; i++) - rdc[i] = i; - - return new HadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc)); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java deleted file mode 100644 index f542cf2..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext; - -/** - * Context for test purpose. - */ -class HadoopTestTaskContext extends HadoopV2TaskContext { - /** - * Simple key-vale pair. - * @param <K> Key class. - * @param <V> Value class. - */ - public static class Pair<K,V> { - /** Key */ - private K key; - - /** Value */ - private V val; - - /** - * @param key key. - * @param val value. - */ - Pair(K key, V val) { - this.key = key; - this.val = val; - } - - /** - * Getter of key. - * @return key. - */ - K key() { - return key; - } - - /** - * Getter of value. - * @return value. - */ - V value() { - return val; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return key + "," + val; - } - } - - /** Mock output container- result data of task execution if it is not overridden. */ - private List<Pair<String, Integer>> mockOutput = new ArrayList<>(); - - /** Mock input container- input data if it is not overridden. */ - private Map<Object,List> mockInput = new TreeMap<>(); - - /** Context output implementation to write data into mockOutput. */ - private HadoopTaskOutput output = new HadoopTaskOutput() { - /** {@inheritDoc} */ - @Override public void write(Object key, Object val) { - //Check of casting and extract/copy values - String strKey = new String(((Text)key).getBytes()); - int intVal = ((IntWritable)val).get(); - - mockOutput().add(new Pair<>(strKey, intVal)); - } - - /** {@inheritDoc} */ - @Override public void close() { - throw new UnsupportedOperationException(); - } - }; - - /** Context input implementation to read data from mockInput. */ - private HadoopTaskInput input = new HadoopTaskInput() { - /** Iterator of keys and associated lists of values. */ - Iterator<Map.Entry<Object, List>> iter; - - /** Current key and associated value list. */ - Map.Entry<Object, List> currEntry; - - /** {@inheritDoc} */ - @Override public boolean next() { - if (iter == null) - iter = mockInput().entrySet().iterator(); - - if (iter.hasNext()) - currEntry = iter.next(); - else - currEntry = null; - - return currEntry != null; - } - - /** {@inheritDoc} */ - @Override public Object key() { - return currEntry.getKey(); - } - - /** {@inheritDoc} */ - @Override public Iterator<?> values() { - return currEntry.getValue().iterator() ; - } - - /** {@inheritDoc} */ - @Override public void close() { - throw new UnsupportedOperationException(); - } - }; - - /** - * Getter of mock output container - result of task if it is not overridden. - * - * @return mock output. - */ - public List<Pair<String, Integer>> mockOutput() { - return mockOutput; - } - - /** - * Getter of mock input container- input data if it is not overridden. - * - * @return mock output. - */ - public Map<Object, List> mockInput() { - return mockInput; - } - - /** - * Generate one-key-multiple-values tree from array of key-value pairs, and wrap its into Writable objects. - * The result is placed into mock input. - * - * @param flatData list of key-value pair. - */ - public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) { - Text key = new Text(); - - for (HadoopTestTaskContext.Pair<String, Integer> pair : flatData) { - key.set(pair.key); - ArrayList<IntWritable> valList; - - if (!mockInput.containsKey(key)) { - valList = new ArrayList<>(); - mockInput.put(key, valList); - key = new Text(); - } - else - valList = (ArrayList<IntWritable>) mockInput.get(key); - valList.add(new IntWritable(pair.value())); - } - } - - /** - * @param taskInfo Task info. - * @param gridJob Grid Hadoop job. - */ - public HadoopTestTaskContext(HadoopTaskInfo taskInfo, HadoopJob gridJob) throws IgniteCheckedException { - super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob)); - } - - /** - * Creates DataInput to read JobConf. - * - * @param job Job. - * @return DataInput with JobConf. - * @throws IgniteCheckedException If failed. - */ - private static DataInput jobConfDataInput(HadoopJob job) throws IgniteCheckedException { - JobConf jobConf = new JobConf(); - - for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)job.info()).properties().entrySet()) - jobConf.set(e.getKey(), e.getValue()); - - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - - try { - jobConf.write(new DataOutputStream(buf)); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - return new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); - } - - /** {@inheritDoc} */ - @Override public HadoopTaskOutput output() { - return output; - } - - /** {@inheritDoc} */ - @Override public HadoopTaskInput input() { - return input; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java deleted file mode 100644 index da0d922..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Utility class for tests. - */ -public class HadoopTestUtils { - /** Base test directory. */ - private static final File BASE_TEST_DIR = new File(U.getIgniteHome() + "/work/test/hadoop/"); - - /** - * @return Base directory for tests. - */ - public static File baseTestDir() { - return BASE_TEST_DIR; - } - - /** - * Get test directory. - * - * @param parts Parts. - * @return Directory. - */ - public static File testDir(String... parts) { - File res = BASE_TEST_DIR; - - if (parts != null) { - for (String part : parts) - res = new File(res, part); - } - - return res; - } - - /** - * Clear base test directory. - */ - public static void clearBaseTestDir() { - if (baseTestDir().exists()) - assert delete(baseTestDir()); - } - - /** - * Checks that job statistics file contains valid strings only. - * - * @param reader Buffered reader to get lines of job statistics. - * @return Amount of events. - * @throws IOException If failed. - */ - @SuppressWarnings("ResultOfMethodCallIgnored") - public static long simpleCheckJobStatFile(BufferedReader reader) throws IOException { - Collection<String> phases = new HashSet<>(); - - phases.add("submit"); - phases.add("prepare"); - phases.add("start"); - phases.add("finish"); - phases.add("requestId"); - phases.add("responseId"); - - Collection<String> evtTypes = new HashSet<>(); - - evtTypes.add("JOB"); - evtTypes.add("SETUP"); - evtTypes.add("MAP"); - evtTypes.add("SHUFFLE"); - evtTypes.add("REDUCE"); - evtTypes.add("COMBINE"); - evtTypes.add("COMMIT"); - - long evtCnt = 0; - String line; - - Map<Long, String> reduceNodes = new HashMap<>(); - - while((line = reader.readLine()) != null) { - String[] splitLine = line.split(":"); - - //Try parse timestamp - Long.parseLong(splitLine[1]); - - String[] evt = splitLine[0].split(" "); - - assertTrue("Unknown event '" + evt[0] + "'", evtTypes.contains(evt[0])); - - String phase; - - if ("JOB".equals(evt[0])) - phase = evt[1]; - else { - assertEquals(4, evt.length); - assertTrue("The node id is not defined", !F.isEmpty(evt[3])); - - long taskNum = Long.parseLong(evt[1]); - - if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) { - String nodeId = reduceNodes.get(taskNum); - - if (nodeId == null) - reduceNodes.put(taskNum, evt[3]); - else - assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]); - } - - phase = evt[2]; - } - - assertTrue("Unknown phase '" + phase + "' in " + Arrays.toString(evt), phases.contains(phase)); - - evtCnt++; - } - - return evtCnt; - } - - /** - * Deletes file or directory with all sub-directories and files. - * - * @param file File or directory to delete. - * @return {@code true} if and only if the file or directory is successfully deleted, - * {@code false} otherwise - */ - public static boolean delete(@Nullable File file) { - if (file == null) - return false; - - boolean res = true; - - if (file.isDirectory()) { - File[] files = file.listFiles(); - - if (files != null && files.length > 0) - for (File file1 : files) - if (file1.isDirectory()) - res &= delete(file1); - else - res &= file1.delete(); - - res &= file.delete(); - } - else - res = file.delete(); - - return res; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopUserLibsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopUserLibsSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopUserLibsSelfTest.java deleted file mode 100644 index 9e3c8f4..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopUserLibsSelfTest.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; - -/** - * Tests for user libs parsing. - */ -public class HadoopUserLibsSelfTest extends GridCommonAbstractTest { - /** Directory 1. */ - private static final File DIR_1 = HadoopTestUtils.testDir("dir1"); - - /** File 1 in directory 1. */ - private static final File FILE_1_1 = new File(DIR_1, "file1.jar"); - - /** File 2 in directory 1. */ - private static final File FILE_1_2 = new File(DIR_1, "file2.jar"); - - /** Directory 2. */ - private static final File DIR_2 = HadoopTestUtils.testDir("dir2"); - - /** File 1 in directory 2. */ - private static final File FILE_2_1 = new File(DIR_2, "file1.jar"); - - /** File 2 in directory 2. */ - private static final File FILE_2_2 = new File(DIR_2, "file2.jar"); - - /** Missing directory. */ - private static final File MISSING_DIR = HadoopTestUtils.testDir("missing_dir"); - - /** Missing file. */ - private static final File MISSING_FILE = new File(MISSING_DIR, "file.jar"); - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - HadoopTestUtils.clearBaseTestDir(); - - assert DIR_1.mkdirs(); - assert DIR_2.mkdirs(); - - assert FILE_1_1.createNewFile(); - assert FILE_1_2.createNewFile(); - assert FILE_2_1.createNewFile(); - assert FILE_2_2.createNewFile(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - // Sanity checks before test start. - ensureExists(FILE_1_1); - ensureExists(FILE_1_2); - ensureExists(FILE_2_1); - ensureExists(FILE_2_2); - - ensureNotExists(MISSING_DIR); - ensureNotExists(MISSING_FILE); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - HadoopTestUtils.clearBaseTestDir(); - } - - /** - * Test null or empty user libs. - * - * @throws Exception If failed. - */ - public void testNullOrEmptyUserLibs() throws Exception { - assert parse(null).isEmpty(); - assert parse("").isEmpty(); - } - - /** - * Test single file. - * - * @throws Exception If failed. - */ - public void testSingle() throws Exception { - Collection<File> res = parse(single(FILE_1_1)); - - assert res.size() == 1; - assert res.contains(FILE_1_1); - - res = parse(single(MISSING_FILE)); - - assert res.size() == 0; - } - - /** - * Test multiple files. - * - * @throws Exception If failed. - */ - public void testMultiple() throws Exception { - Collection<File> res = - parse(merge(single(FILE_1_1), single(FILE_1_2), single(FILE_2_1), single(FILE_2_2), single(MISSING_FILE))); - - assert res.size() == 4; - assert res.contains(FILE_1_1); - assert res.contains(FILE_1_2); - assert res.contains(FILE_2_1); - assert res.contains(FILE_2_2); - } - - /** - * Test single wildcard. - * - * @throws Exception If failed. - */ - public void testSingleWildcard() throws Exception { - Collection<File> res = parse(wildcard(DIR_1)); - - assert res.size() == 2; - assert res.contains(FILE_1_1); - assert res.contains(FILE_1_2); - - res = parse(wildcard(MISSING_DIR)); - - assert res.size() == 0; - } - - /** - * Test multiple wildcards. - * - * @throws Exception If failed. - */ - public void testMultipleWildcards() throws Exception { - Collection<File> res = parse(merge(wildcard(DIR_1), wildcard(DIR_2), wildcard(MISSING_DIR))); - - assert res.size() == 4; - assert res.contains(FILE_1_1); - assert res.contains(FILE_1_2); - assert res.contains(FILE_2_1); - assert res.contains(FILE_2_2); - } - - /** - * Test mixed tokens. - * - * @throws Exception If failed. - */ - public void testMixed() throws Exception { - String str = merge( - single(FILE_1_1), - wildcard(DIR_2), - single(MISSING_FILE), - wildcard(MISSING_DIR) - ); - - Collection<File> res = parse(str); - - assert res.size() == 3; - assert res.contains(FILE_1_1); - assert res.contains(FILE_2_1); - assert res.contains(FILE_2_2); - } - /** - * Ensure provided file exists. - * - * @param file File. - */ - private static void ensureExists(File file) { - assert file.exists(); - } - - /** - * Ensure provided file doesn't exist. - * - * @param file File. - */ - private static void ensureNotExists(File file) { - assert !file.exists(); - } - - /** - * Merge string using path separator. - * - * @param vals Values. - * @return Result. - */ - private static String merge(String... vals) { - StringBuilder res = new StringBuilder(); - - if (vals != null) { - boolean first = true; - - for (String val : vals) { - if (first) - first = false; - else - res.append(File.pathSeparatorChar); - - res.append(val); - } - } - - return res.toString(); - } - - /** - * Parse string. - * - * @param str String. - * @return Files. - * @throws IOException If failed. - */ - Collection<File> parse(String str) throws IOException { - Collection<HadoopClasspathUtils.SearchDirectory> dirs = HadoopClasspathUtils.parseUserLibs(str); - - Collection<File> res = new HashSet<>(); - - for (HadoopClasspathUtils.SearchDirectory dir : dirs) - Collections.addAll(res, dir.files()); - - return res; - } - - /** - * Get absolute path to a single file. - * - * @param file File. - * @return Path. - */ - private static String single(File file) { - return file.getAbsolutePath(); - } - - /** - * Create a wildcard. - * - * @param file File. - * @return Wildcard. - */ - private static String wildcard(File file) { - return file.getAbsolutePath() + File.separatorChar + "*"; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java deleted file mode 100644 index ae2c00d..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.InputStream; -import java.util.UUID; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.WritableSerialization; -import org.apache.hadoop.mapred.JobConf; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopSerializationWrapper; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; - -/** - * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job}. - */ -public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { - /** */ - private static final String TEST_SERIALIZED_VALUE = "Test serialized value"; - - /** - * Custom serialization class that accepts {@link Writable}. - */ - private static class CustomSerialization extends WritableSerialization { - /** {@inheritDoc} */ - @Override public Deserializer<Writable> getDeserializer(Class<Writable> c) { - return new Deserializer<Writable>() { - @Override public void open(InputStream in) { } - - @Override public Writable deserialize(Writable writable) { - return new Text(TEST_SERIALIZED_VALUE); - } - - @Override public void close() { } - }; - } - } - - /** - * Tests that {@link HadoopJob} provides wrapped serializer if it's set in configuration. - * - * @throws IgniteCheckedException If fails. - */ - public void testCustomSerializationApplying() throws IgniteCheckedException { - JobConf cfg = new JobConf(); - - cfg.setMapOutputKeyClass(IntWritable.class); - cfg.setMapOutputValueClass(Text.class); - cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - - HadoopDefaultJobInfo info = createJobInfo(cfg); - - final UUID uuid = UUID.randomUUID(); - - HadoopJobId id = new HadoopJobId(uuid, 1); - - HadoopJob job = info.createJob(HadoopV2Job.class, id, log, null); - - HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, - null)); - - HadoopSerialization ser = taskCtx.keySerialization(); - - assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName()); - - DataInput in = new DataInputStream(new ByteArrayInputStream(new byte[0])); - - assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); - - ser = taskCtx.valueSerialization(); - - assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName()); - - assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java deleted file mode 100644 index 1496150..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.configuration.IgniteConfiguration; - -/** - * Configuration validation tests. - */ -public class HadoopValidationSelfTest extends HadoopAbstractSelfTest { - /** Peer class loading enabled flag. */ - public boolean peerClassLoading; - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - - peerClassLoading = false; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setPeerClassLoadingEnabled(peerClassLoading); - - return cfg; - } - - /** - * Ensure that Grid starts when all configuration parameters are valid. - * - * @throws Exception If failed. - */ - public void testValid() throws Exception { - startGrids(1); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java deleted file mode 100644 index 4e7cc50..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java +++ /dev/null @@ -1,599 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.IgniteNodeAttributes; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; -import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock; -import org.apache.ignite.internal.processors.igfs.IgfsMock; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.lang.IgniteProductVersion; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.UUID; - -/** - * Tests for weighted map-reduce planned. - */ -public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest { - /** ID 1. */ - private static final UUID ID_1 = new UUID(0, 1); - - /** ID 2. */ - private static final UUID ID_2 = new UUID(0, 2); - - /** ID 3. */ - private static final UUID ID_3 = new UUID(0, 3); - - /** MAC 1. */ - private static final String MAC_1 = "mac1"; - - /** MAC 2. */ - private static final String MAC_2 = "mac2"; - - /** MAC 3. */ - private static final String MAC_3 = "mac3"; - - /** Host 1. */ - private static final String HOST_1 = "host1"; - - /** Host 2. */ - private static final String HOST_2 = "host2"; - - /** Host 3. */ - private static final String HOST_3 = "host3"; - - /** Host 4. */ - private static final String HOST_4 = "host4"; - - /** Host 5. */ - private static final String HOST_5 = "host5"; - - /** Standard node 1. */ - private static final MockNode NODE_1 = new MockNode(ID_1, MAC_1, HOST_1); - - /** Standard node 2. */ - private static final MockNode NODE_2 = new MockNode(ID_2, MAC_2, HOST_2); - - /** Standard node 3. */ - private static final MockNode NODE_3 = new MockNode(ID_3, MAC_3, HOST_3); - - /** Standard nodes. */ - private static final Collection<ClusterNode> NODES; - - /** - * Static initializer. - */ - static { - NODES = new ArrayList<>(); - - NODES.add(NODE_1); - NODES.add(NODE_2); - NODES.add(NODE_3); - } - - /** - * Test one IGFS split being assigned to affinity node. - * - * @throws Exception If failed. - */ - public void testOneIgfsSplitAffinity() throws Exception { - IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); - - List<HadoopInputSplit> splits = new ArrayList<>(); - - splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("igfs://igfs@/file"), 0, 50)); - - final int expReducers = 4; - - HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); - - IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); - - HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); - - assert plan.mappers() == 1; - assert plan.mapperNodeIds().size() == 1; - assert plan.mapperNodeIds().contains(ID_1); - - checkPlanMappers(plan, splits, NODES, false/*only 1 split*/); - checkPlanReducers(plan, NODES, expReducers, false/* because of threshold behavior.*/); - } - - /** - * Test one HDFS splits. - * - * @throws Exception If failed. - */ - public void testHdfsSplitsAffinity() throws Exception { - IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); - - final List<HadoopInputSplit> splits = new ArrayList<>(); - - splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50)); - splits.add(new HadoopFileBlock(new String[] { HOST_2 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100)); - splits.add(new HadoopFileBlock(new String[] { HOST_3 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37)); - - // The following splits belong to hosts that are out of Ignite topology at all. - // This means that these splits should be assigned to any least loaded modes: - splits.add(new HadoopFileBlock(new String[] { HOST_4 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2)); - splits.add(new HadoopFileBlock(new String[] { HOST_5 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3)); - - final int expReducers = 7; - - HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); - - IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); - - final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); - - checkPlanMappers(plan, splits, NODES, true); - - checkPlanReducers(plan, NODES, expReducers, true); - } - - /** - * Test HDFS splits with Replication == 3. - * - * @throws Exception If failed. - */ - public void testHdfsSplitsReplication() throws Exception { - IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); - - final List<HadoopInputSplit> splits = new ArrayList<>(); - - splits.add(new HadoopFileBlock(new String[] { HOST_1, HOST_2, HOST_3 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50)); - splits.add(new HadoopFileBlock(new String[] { HOST_2, HOST_3, HOST_4 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100)); - splits.add(new HadoopFileBlock(new String[] { HOST_3, HOST_4, HOST_5 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37)); - // The following splits belong to hosts that are out of Ignite topology at all. - // This means that these splits should be assigned to any least loaded modes: - splits.add(new HadoopFileBlock(new String[] { HOST_4, HOST_5, HOST_1 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2)); - splits.add(new HadoopFileBlock(new String[] { HOST_5, HOST_1, HOST_2 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3)); - - final int expReducers = 8; - - HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); - - IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); - - final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); - - checkPlanMappers(plan, splits, NODES, true); - - checkPlanReducers(plan, NODES, expReducers, true); - } - - /** - * Get all IDs. - * - * @param nodes Nodes. - * @return IDs. - */ - private static Set<UUID> allIds(Collection<ClusterNode> nodes) { - Set<UUID> allIds = new HashSet<>(); - - for (ClusterNode n : nodes) - allIds.add(n.id()); - - return allIds; - } - - /** - * Check mappers for the plan. - * - * @param plan Plan. - * @param splits Splits. - * @param nodes Nodes. - * @param expectUniformity WHether uniformity is expected. - */ - private static void checkPlanMappers(HadoopMapReducePlan plan, List<HadoopInputSplit> splits, - Collection<ClusterNode> nodes, boolean expectUniformity) { - // Number of mappers should correspomd to the number of input splits: - assertEquals(splits.size(), plan.mappers()); - - if (expectUniformity) { - // mappers are assigned to all available nodes: - assertEquals(nodes.size(), plan.mapperNodeIds().size()); - - - assertEquals(allIds(nodes), plan.mapperNodeIds()); - } - - // Check all splits are covered by mappers: - Set<HadoopInputSplit> set = new HashSet<>(); - - for (UUID id: plan.mapperNodeIds()) { - Collection<HadoopInputSplit> sp = plan.mappers(id); - - assert sp != null; - - for (HadoopInputSplit s: sp) - assertTrue(set.add(s)); - } - - // must be of the same size & contain same elements: - assertEquals(set, new HashSet<>(splits)); - } - - /** - * Check plan reducers. - * - * @param plan Plan. - * @param nodes Nodes. - * @param expReducers Expected reducers. - * @param expectUniformity Expected uniformity. - */ - private static void checkPlanReducers(HadoopMapReducePlan plan, - Collection<ClusterNode> nodes, int expReducers, boolean expectUniformity) { - - assertEquals(expReducers, plan.reducers()); - - if (expectUniformity) - assertEquals(allIds(nodes), plan.reducerNodeIds()); - - int sum = 0; - int lenSum = 0; - - for (UUID uuid: plan.reducerNodeIds()) { - int[] rr = plan.reducers(uuid); - - assert rr != null; - - lenSum += rr.length; - - for (int i: rr) - sum += i; - } - - assertEquals(expReducers, lenSum); - - // Numbers in the arrays must be consequtive integers stating from 0, - // check that simply calculating their total sum: - assertEquals((lenSum * (lenSum - 1) / 2), sum); - } - - /** - * Create planner for IGFS. - * - * @param igfs IGFS. - * @return Planner. - */ - private static IgniteHadoopWeightedMapReducePlanner createPlanner(IgfsMock igfs) { - IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner(); - - IgfsIgniteMock ignite = new IgfsIgniteMock(null, igfs); - - GridTestUtils.setFieldValue(planner, HadoopAbstractMapReducePlanner.class, "ignite", ignite); - - return planner; - } - - /** - * Throw {@link UnsupportedOperationException}. - */ - private static void throwUnsupported() { - throw new UnsupportedOperationException("Should not be called!"); - } - - /** - * Mocked node. - */ - private static class MockNode implements ClusterNode { - /** ID. */ - private final UUID id; - - /** MAC addresses. */ - private final String macs; - - /** Addresses. */ - private final List<String> addrs; - - /** - * Constructor. - * - * @param id Node ID. - * @param macs MAC addresses. - * @param addrs Addresses. - */ - public MockNode(UUID id, String macs, String... addrs) { - assert addrs != null; - - this.id = id; - this.macs = macs; - - this.addrs = Arrays.asList(addrs); - } - - /** {@inheritDoc} */ - @Override public UUID id() { - return id; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Nullable @Override public <T> T attribute(String name) { - if (F.eq(name, IgniteNodeAttributes.ATTR_MACS)) - return (T)macs; - - throwUnsupported(); - - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<String> addresses() { - return addrs; - } - - /** {@inheritDoc} */ - @Override public Object consistentId() { - throwUnsupported(); - - return null; - } - - /** {@inheritDoc} */ - @Override public ClusterMetrics metrics() { - throwUnsupported(); - - return null; - } - - /** {@inheritDoc} */ - @Override public Map<String, Object> attributes() { - throwUnsupported(); - - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<String> hostNames() { - throwUnsupported(); - - return null; - } - - /** {@inheritDoc} */ - @Override public long order() { - throwUnsupported(); - - return 0; - } - - /** {@inheritDoc} */ - @Override public IgniteProductVersion version() { - throwUnsupported(); - - return null; - } - - /** {@inheritDoc} */ - @Override public boolean isLocal() { - throwUnsupported(); - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isDaemon() { - throwUnsupported(); - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isClient() { - throwUnsupported(); - - return false; - } - } - - /** - * Locations builder. - */ - private static class LocationsBuilder { - /** Locations. */ - private final TreeMap<Long, Collection<MockNode>> locs = new TreeMap<>(); - - /** - * Create new locations builder. - * - * @return Locations builder. - */ - public static LocationsBuilder create() { - return new LocationsBuilder(); - } - - /** - * Add locations. - * - * @param start Start. - * @param nodes Nodes. - * @return This builder for chaining. - */ - public LocationsBuilder add(long start, MockNode... nodes) { - locs.put(start, Arrays.asList(nodes)); - - return this; - } - - /** - * Build locations. - * - * @return Locations. - */ - public TreeMap<Long, Collection<MockNode>> build() { - return locs; - } - - /** - * Build IGFS. - * - * @return IGFS. - */ - public MockIgfs buildIgfs() { - return new MockIgfs(build()); - } - } - - /** - * Mocked IGFS. - */ - private static class MockIgfs extends IgfsMock { - /** Block locations. */ - private final TreeMap<Long, Collection<MockNode>> locs; - - /** - * Constructor. - * - * @param locs Block locations. - */ - public MockIgfs(TreeMap<Long, Collection<MockNode>> locs) { - super("igfs"); - - this.locs = locs; - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) { - Collection<IgfsBlockLocation> res = new ArrayList<>(); - - long cur = start; - long remaining = len; - - long prevLocStart = -1; - Collection<MockNode> prevLocNodes = null; - - for (Map.Entry<Long, Collection<MockNode>> locEntry : locs.entrySet()) { - long locStart = locEntry.getKey(); - Collection<MockNode> locNodes = locEntry.getValue(); - - if (prevLocNodes != null) { - if (cur < locStart) { - // Add part from previous block. - long prevLen = locStart - prevLocStart; - - res.add(new IgfsBlockLocationMock(cur, prevLen, prevLocNodes)); - - cur = locStart; - remaining -= prevLen; - } - } - - prevLocStart = locStart; - prevLocNodes = locNodes; - - if (remaining == 0) - break; - } - - // Add remainder. - if (remaining != 0) - res.add(new IgfsBlockLocationMock(cur, remaining, prevLocNodes)); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean exists(IgfsPath path) { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean isProxy(URI path) { - return false; - } - } - - /** - * Mocked block location. - */ - private static class IgfsBlockLocationMock implements IgfsBlockLocation { - /** Start. */ - private final long start; - - /** Length. */ - private final long len; - - /** Node IDs. */ - private final List<UUID> nodeIds; - - /** - * Constructor. - * - * @param start Start. - * @param len Length. - * @param nodes Nodes. - */ - public IgfsBlockLocationMock(long start, long len, Collection<MockNode> nodes) { - this.start = start; - this.len = len; - - this.nodeIds = new ArrayList<>(nodes.size()); - - for (MockNode node : nodes) - nodeIds.add(node.id); - } - - /** {@inheritDoc} */ - @Override public long start() { - return start; - } - - /** {@inheritDoc} */ - @Override public long length() { - return len; - } - - /** {@inheritDoc} */ - @Override public Collection<UUID> nodeIds() { - return nodeIds; - } - - /** {@inheritDoc} */ - @Override public Collection<String> names() { - throwUnsupported(); - - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<String> hosts() { - throwUnsupported(); - - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java deleted file mode 100644 index e0403c2..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner; - -/** - * Tests whole map-red execution Weighted planner. - */ -public class HadoopWeightedPlannerMapReduceTest extends HadoopMapReduceTest { - /** {@inheritDoc} */ - @Override protected HadoopConfiguration createHadoopConfiguration() { - HadoopConfiguration hadoopCfg = new HadoopConfiguration(); - - // Use weighted planner with default settings: - IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner(); - - hadoopCfg.setMapReducePlanner(planner); - - return hadoopCfg; - } -}