IGNITE-3093: Hadoop: Fixed map reduce planner for a case when passed file has "igfs://" scheme, but doesn't exist in the file system.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/044ca2c3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/044ca2c3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/044ca2c3 Branch: refs/heads/ignite-2832 Commit: 044ca2c3d7ddf6f6239c1c15476f89e8d4e991f6 Parents: 4b11219 Author: vozerov-gridgain <[email protected]> Authored: Fri May 6 16:12:36 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri May 6 16:12:36 2016 +0300 ---------------------------------------------------------------------- .../mapreduce/IgniteHadoopMapReducePlanner.java | 72 +++++++++++--------- .../HadoopDefaultMapReducePlannerSelfTest.java | 2 +- 2 files changed, 39 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/044ca2c3/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java index 1562a89..287b5ec 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java @@ -179,54 +179,58 @@ public class IgniteHadoopMapReducePlanner implements HadoopMapReducePlanner { igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs()); if (igfs != null && !igfs.isProxy(split0.file())) { - Collection<IgfsBlockLocation> blocks; + IgfsPath path = new IgfsPath(split0.file()); - try { - blocks = igfs.affinity(new IgfsPath(split0.file()), split0.start(), split0.length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } + if (igfs.exists(path)) { + Collection<IgfsBlockLocation> blocks; + + try { + blocks = igfs.affinity(path, split0.start(), split0.length()); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } - assert blocks != null; + assert blocks != null; - if (blocks.size() == 1) - // Fast-path, split consists of one IGFS block (as in most cases). - return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false); - else { - // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes. - Map<UUID, Long> nodeMap = new HashMap<>(); + if (blocks.size() == 1) + // Fast-path, split consists of one IGFS block (as in most cases). + return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false); + else { + // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes. + Map<UUID, Long> nodeMap = new HashMap<>(); - List<UUID> bestNodeIds = null; - long bestLen = -1L; + List<UUID> bestNodeIds = null; + long bestLen = -1L; - for (IgfsBlockLocation block : blocks) { - for (UUID blockNodeId : block.nodeIds()) { - if (topIds.contains(blockNodeId)) { - Long oldLen = nodeMap.get(blockNodeId); - long newLen = oldLen == null ? block.length() : oldLen + block.length(); + for (IgfsBlockLocation block : blocks) { + for (UUID blockNodeId : block.nodeIds()) { + if (topIds.contains(blockNodeId)) { + Long oldLen = nodeMap.get(blockNodeId); + long newLen = oldLen == null ? block.length() : oldLen + block.length(); - nodeMap.put(blockNodeId, newLen); + nodeMap.put(blockNodeId, newLen); - if (bestNodeIds == null || bestLen < newLen) { - bestNodeIds = new ArrayList<>(1); + if (bestNodeIds == null || bestLen < newLen) { + bestNodeIds = new ArrayList<>(1); - bestNodeIds.add(blockNodeId); + bestNodeIds.add(blockNodeId); - bestLen = newLen; - } - else if (bestLen == newLen) { - assert !F.isEmpty(bestNodeIds); + bestLen = newLen; + } + else if (bestLen == newLen) { + assert !F.isEmpty(bestNodeIds); - bestNodeIds.add(blockNodeId); + bestNodeIds.add(blockNodeId); + } } } } - } - if (bestNodeIds != null) { - return bestNodeIds.size() == 1 ? bestNodeIds.get(0) : - bestNode(bestNodeIds, topIds, nodeLoads, true); + if (bestNodeIds != null) { + return bestNodeIds.size() == 1 ? bestNodeIds.get(0) : + bestNode(bestNodeIds, topIds, nodeLoads, true); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/044ca2c3/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java index 2bdf28c..b38f3a2 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java @@ -785,7 +785,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes /** {@inheritDoc} */ @Override public boolean exists(IgfsPath path) { - return false; + return true; } /** {@inheritDoc} */
