1 more fix + enhanced tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cfc51d7e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cfc51d7e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cfc51d7e Branch: refs/heads/ignite-3414 Commit: cfc51d7e45eb8693edc92296e87c239a8638159d Parents: d17e53d Author: iveselovskiy <[email protected]> Authored: Thu Jul 7 22:11:38 2016 +0300 Committer: iveselovskiy <[email protected]> Committed: Thu Jul 7 22:11:38 2016 +0300 ---------------------------------------------------------------------- .../IgniteHadoopWeightedMapReducePlanner.java | 71 ++++++++-- .../planner/HadoopAbstractMapReducePlanner.java | 1 - .../processors/hadoop/HadoopMapReduceTest.java | 16 ++- .../HadoopWeightedMapReducePlannerTest.java | 141 +++++++++++++++++-- .../HadoopWeightedPlannerMapReduceTest.java | 38 +++++ .../testsuites/IgniteHadoopTestSuite.java | 2 + 6 files changed, 243 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc51d7e/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java index cfe6113..b869775 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -17,6 +17,7 @@ package org.apache.ignite.hadoop.mapreduce; +import java.util.LinkedHashSet; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteFileSystem; @@ -229,7 +230,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc res.put(new NodeIdAndLength(id, idToLenEntry.getValue()), id); } - return new ArrayList<>(res.values()); + return new LinkedHashSet<>(res.values()); } } } @@ -247,7 +248,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * @return Result. */ private UUID bestMapperNode(@Nullable Collection<UUID> affIds, HadoopMapReducePlanTopology top) { - // Priority node, may be null or empty. + // Priority node, may be null or empty in case if this is a Hadoop split from a data node that does + // not match any Ignite node. final @Nullable UUID prioAffId = F.first(affIds); // Find group with the least weight. @@ -356,8 +358,11 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc */ private Map<UUID, int[]> assignReducers(Collection<HadoopInputSplit> splits, HadoopMapReducePlanTopology top, Mappers mappers, int reducerCnt) { + // 'reducers' is mapping from node id to number of reducers to be assigned to that node. Map<UUID, Integer> reducers = assignReducers0(top, splits, mappers, reducerCnt); + // Convert the result of previous invocation: number of reducers is converted to same length array filled with + // reducer order number: int cnt = 0; Map<UUID, int[]> res = new HashMap<>(reducers.size()); @@ -378,6 +383,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc /** * Generate reducers. + * Determines the number of reducers to be assigned to each node. * * @param top Topology. * @param splits Input splits. @@ -387,14 +393,18 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc */ private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology top, Collection<HadoopInputSplit> splits, Mappers mappers, int reducerCnt) { - Map<UUID, Integer> res = new HashMap<>(); - // Assign reducers to splits. + // The map contains hom many reducers should be assigned per split: Map<HadoopInputSplit, Integer> splitToReducerCnt = assignReducersToSplits(splits, reducerCnt); +// int sum = sum(splitToReducerCnt); +// assert reducerCnt == sum : "exp: " + reducerCnt + ", act: " + sum; + // Assign as much local reducers as possible. int remaining = 0; + Map<UUID, Integer> res = new HashMap<>(); + for (Map.Entry<HadoopInputSplit, Integer> entry : splitToReducerCnt.entrySet()) { HadoopInputSplit split = entry.getKey(); int cnt = entry.getValue(); @@ -402,6 +412,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc if (cnt > 0) { int assigned = assignLocalReducers(split, cnt, top, mappers, res); + assert assigned <= cnt; + remaining += cnt - assigned; } } @@ -410,6 +422,9 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc if (remaining > 0) assignRemoteReducers(remaining, top, mappers, res); +// sum = sum(res); +// assert reducerCnt == sum : "exp: " + reducerCnt + ", act: " + sum; + return res; } @@ -417,11 +432,11 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc * Assign local split reducers. * * @param split Split. - * @param cnt Reducer count. + * @param cnt Reducer count: how many reducers should be assigned for this split. * @param top Topology. * @param mappers Mappers. * @param resMap Reducers result map. - * @return Number of assigned reducers. + * @return Number of locally assigned reducers. */ private int assignLocalReducers(HadoopInputSplit split, int cnt, HadoopMapReducePlanTopology top, Mappers mappers, Map<UUID, Integer> resMap) { @@ -446,12 +461,43 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc // Update result map. if (res > 0) - resMap.put(nodeId, res); + inc(resMap, nodeId, res); return res; } /** + * TODO: Docs + * @param reducers + * @return summary count. + */ + public static int sum(Map<?, Integer> reducers) { + int sum = 0; + + for (Integer i: reducers.values()) + sum += i; + + return sum; + } + + /** + * TODO Docs. + * @param resMap + * @param key + * @param increment + * @param <K> + */ + public static <K> void inc(Map<K, Integer> resMap, K key, int increment) { + assert increment > 0; + + // Update result map. + Integer res = resMap.get(key); + + // 1 reducer was added for 'id': + resMap.put(key, res == null ? increment : res + increment); + } + + /** * Assign remote reducers. Assign to the least loaded first. * * @param cnt Count. @@ -467,6 +513,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc set.addAll(top.groups()); while (cnt-- > 0) { + // The least loaded machine: HadoopMapReducePlanGroup grp = set.first(); // Look for affinity nodes. @@ -509,10 +556,8 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc assert add; - // Update result map. - Integer res = resMap.get(id); - - resMap.put(id, res == null ? 1 : res + 1); + // Exactly 1 reducer was added for 'id': + inc(resMap, id, 1); } } @@ -544,10 +589,14 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc int reducerCnt) { Map<HadoopInputSplit, Integer> res = new IdentityHashMap<>(splits.size()); + // reducers per 1 split: int base = reducerCnt / splits.size(); + // TODO: may that be not equal to (reducerCnt % splits.size()) ? int remainder = reducerCnt - base * splits.size(); + // Not considering remainder, each split gets 'base' reducers. + // Distribute the remainder with round-robin strategy: for (HadoopInputSplit split : splits) { int val = base; http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc51d7e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java index 166804f..f01f72b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java @@ -28,7 +28,6 @@ import org.apache.ignite.resources.LoggerResource; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.UUID; http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc51d7e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index 1cb4f74..5d1de38 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -20,9 +20,7 @@ package org.apache.ignite.internal.processors.hadoop; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; @@ -395,10 +393,24 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { cfg.setLocalHost("127.0.0.1"); cfg.setConnectorConfiguration(null); + HadoopConfiguration hadoopCfg = createHadoopConfiguration(); + + if (hadoopCfg != null) + cfg.setHadoopConfiguration(hadoopCfg); + return G.start(cfg); } /** + * Creates custom Hadoop configuration. + * + * @return The Hadoop configuration. + */ + protected HadoopConfiguration createHadoopConfiguration() { + return null; + } + + /** * @return IGFS configuration. */ @Override public FileSystemConfiguration igfsConfiguration() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc51d7e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java index 4fe075c..8c7cc49 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.hadoop; +import java.util.HashSet; +import java.util.Set; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner; @@ -109,11 +111,13 @@ public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest { public void testOneIgfsSplitAffinity() throws Exception { IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); - Collection<HadoopInputSplit> splits = new ArrayList<>(); + List<HadoopInputSplit> splits = new ArrayList<>(); splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("igfs://igfs@/file"), 0, 50)); - HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, 1); + final int expReducers = 4; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); @@ -122,36 +126,149 @@ public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest { assert plan.mappers() == 1; assert plan.mapperNodeIds().size() == 1; assert plan.mapperNodeIds().contains(ID_1); + + checkPlanMappers(plan, splits, NODES, false/*only 1 split*/); + checkPlanReducers(plan, NODES, expReducers, false/* because of threshold behavior.*/); } /** - * Test one HDFS split being assigned to affinity node. + * Test one HDFS splits. * * @throws Exception If failed. */ - public void testOneHdfsSplitAffinity() throws Exception { + public void testHdfsSplitsAffinity() throws Exception { IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); - Collection<HadoopInputSplit> splits = new ArrayList<>(); + final List<HadoopInputSplit> splits = new ArrayList<>(); splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50)); splits.add(new HadoopFileBlock(new String[] { HOST_2 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100)); splits.add(new HadoopFileBlock(new String[] { HOST_3 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37)); - // hosts that are out of Ignite topology at all: + // The following splits belong to hosts that are out of Ignite topology at all. + // This means that these splits should be assigned to any least loaded modes: splits.add(new HadoopFileBlock(new String[] { HOST_4 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2)); splits.add(new HadoopFileBlock(new String[] { HOST_5 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3)); - HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, 3); + final int expReducers = 7; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); - HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); - assertEquals(5, plan.mappers()); - assertEquals(3, plan.mapperNodeIds().size()); - assert plan.mapperNodeIds().contains(ID_1); + checkPlanMappers(plan, splits, NODES, true); + + checkPlanReducers(plan, NODES, expReducers, true); + } + + /** + * Test HDFS splits with Replication == 3. + * + * @throws Exception If failed. + */ + public void testHdfsSplitsReplication() throws Exception { + IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); + + final List<HadoopInputSplit> splits = new ArrayList<>(); + + splits.add(new HadoopFileBlock(new String[] { HOST_1, HOST_2, HOST_3 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50)); + splits.add(new HadoopFileBlock(new String[] { HOST_2, HOST_3, HOST_4 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100)); + splits.add(new HadoopFileBlock(new String[] { HOST_3, HOST_4, HOST_5 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37)); + // The following splits belong to hosts that are out of Ignite topology at all. + // This means that these splits should be assigned to any least loaded modes: + splits.add(new HadoopFileBlock(new String[] { HOST_4, HOST_5, HOST_1 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2)); + splits.add(new HadoopFileBlock(new String[] { HOST_5, HOST_1, HOST_2 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3)); + + final int expReducers = 8; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); + + IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); + + final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + + checkPlanMappers(plan, splits, NODES, true); + + checkPlanReducers(plan, NODES, expReducers, true); + } + + /** + * TODO: Docs. + * @param nodes + * @return + */ + private static Set<UUID> allIds(Collection<ClusterNode> nodes) { + Set<UUID> allIds = new HashSet<>(); + + for (ClusterNode n : nodes) + allIds.add(n.id()); + + return allIds; + } + + /** + * TODO: Docs. + */ + private static void checkPlanMappers(HadoopMapReducePlan plan, List<HadoopInputSplit> splits, + Collection<ClusterNode> nodes, boolean expectUniformity) { + // Number of mappers should correspomd to the number of input splits: + assertEquals(splits.size(), plan.mappers()); + + if (expectUniformity) { + // mappers are assigned to all available nodes: + assertEquals(nodes.size(), plan.mapperNodeIds().size()); + + + assertEquals(allIds(nodes), plan.mapperNodeIds()); + } + + // Check all splits are covered by mappers: + Set<HadoopInputSplit> set = new HashSet<>(); + + for (UUID id: plan.mapperNodeIds()) { + Collection<HadoopInputSplit> sp = plan.mappers(id); + + assert sp != null; + + for (HadoopInputSplit s: sp) + assertTrue(set.add(s)); + } + + // must be of the same size & contain same elements: + assertEquals(set, new HashSet<>(splits)); + } + + /** + * TODO: Docs. + */ + private static void checkPlanReducers(HadoopMapReducePlan plan, + Collection<ClusterNode> nodes, int expReducers, boolean expectUniformity) { + + assertEquals(expReducers, plan.reducers()); + + if (expectUniformity) + assertEquals(allIds(nodes), plan.reducerNodeIds()); + + int sum = 0; + int lenSum = 0; + + for (UUID uuid: plan.reducerNodeIds()) { + int[] rr = plan.reducers(uuid); + + assert rr != null; + + lenSum += rr.length; + + for (int i: rr) + sum += i; + } + + assertEquals(expReducers, lenSum); - // TODO: enhance tests. + // Numbers in the arrays must be consequtive integers stating from 0, + // check that simply calculating their total sum: + assertEquals((lenSum * (lenSum - 1) / 2), sum); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc51d7e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java new file mode 100644 index 0000000..e0403c2 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner; + +/** + * Tests whole map-red execution Weighted planner. + */ +public class HadoopWeightedPlannerMapReduceTest extends HadoopMapReduceTest { + /** {@inheritDoc} */ + @Override protected HadoopConfiguration createHadoopConfiguration() { + HadoopConfiguration hadoopCfg = new HadoopConfiguration(); + + // Use weighted planner with default settings: + IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner(); + + hadoopCfg.setMapReducePlanner(planner); + + return hadoopCfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc51d7e/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index feecd14..52104a8 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -70,6 +70,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTasksV2Test; import org.apache.ignite.internal.processors.hadoop.HadoopV2JobSelfTest; import org.apache.ignite.internal.processors.hadoop.HadoopValidationSelfTest; import org.apache.ignite.internal.processors.hadoop.HadoopWeightedMapReducePlannerTest; +import org.apache.ignite.internal.processors.hadoop.HadoopWeightedPlannerMapReduceTest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimapSelftest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMapSelfTest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipListSelfTest; @@ -176,6 +177,7 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(HadoopTasksV2Test.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopWeightedPlannerMapReduceTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceEmbeddedSelfTest.class.getName())));
