IGNITE-4281: Hadoop: decoupled remote and local maps to simplify further optimizations. This closes #1264. This closes #1315.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/be12a7ea Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/be12a7ea Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/be12a7ea Branch: refs/heads/master Commit: be12a7ea242dedba932c15dce005540c34711e77 Parents: 04cff9b Author: devozerov <[email protected]> Authored: Mon Dec 5 17:09:28 2016 +0300 Committer: devozerov <[email protected]> Committed: Thu Dec 15 13:46:01 2016 +0300 ---------------------------------------------------------------------- .../hadoop/shuffle/HadoopShuffleJob.java | 85 ++++++++++++++------ 1 file changed, 61 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/be12a7ea/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index aca5fdf..3afb55a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -17,9 +17,7 @@ package org.apache.ignite.internal.processors.hadoop.shuffle; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -56,8 +54,8 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.thread.IgniteThread; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; @@ -77,20 +75,26 @@ public class HadoopShuffleJob<T> implements AutoCloseable { /** */ private final boolean needPartitioner; - /** Collection of task contexts for each reduce task. */ - private final Map<Integer, LocalTaskContextProxy> reducersCtx = new HashMap<>(); + /** Task contexts for each reduce task. */ + private final AtomicReferenceArray<LocalTaskContextProxy> locReducersCtx; /** Reducers addresses. */ private T[] reduceAddrs; + /** Total reducer count. */ + private final int totalReducerCnt; + /** Local reducers address. */ private final T locReduceAddr; /** */ private final HadoopShuffleMessage[] msgs; - /** */ - private final AtomicReferenceArray<HadoopMultimap> maps; + /** Maps for local reducers. */ + private final AtomicReferenceArray<HadoopMultimap> locMaps; + + /** Maps for remote reducers. */ + private final AtomicReferenceArray<HadoopMultimap> rmtMaps; /** */ private volatile IgniteInClosure2X<T, HadoopMessage> io; @@ -129,23 +133,27 @@ public class HadoopShuffleJob<T> implements AutoCloseable { public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, int totalReducerCnt, int[] locReducers) throws IgniteCheckedException { this.locReduceAddr = locReduceAddr; + this.totalReducerCnt = totalReducerCnt; this.job = job; this.mem = mem; this.log = log.getLogger(HadoopShuffleJob.class); msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE); + locReducersCtx = new AtomicReferenceArray<>(totalReducerCnt); + if (!F.isEmpty(locReducers)) { for (int rdc : locReducers) { HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null); - reducersCtx.put(rdc, new LocalTaskContextProxy(taskInfo)); + locReducersCtx.set(rdc, new LocalTaskContextProxy(taskInfo)); } } needPartitioner = totalReducerCnt > 1; - maps = new AtomicReferenceArray<>(totalReducerCnt); + locMaps = new AtomicReferenceArray<>(totalReducerCnt); + rmtMaps = new AtomicReferenceArray<>(totalReducerCnt); msgs = new HadoopShuffleMessage[totalReducerCnt]; throttle = get(job.info(), SHUFFLE_JOB_THROTTLE, 0); @@ -237,13 +245,13 @@ public class HadoopShuffleJob<T> implements AutoCloseable { assert msg.buffer() != null; assert msg.offset() > 0; - HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()).get(); + HadoopTaskContext taskCtx = locReducersCtx.get(msg.reducer()).get(); HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null); perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis()); - HadoopMultimap map = getOrCreateMap(maps, msg.reducer()); + HadoopMultimap map = getOrCreateMap(locMaps, msg.reducer()); // Add data from message to the map. try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) { @@ -320,10 +328,10 @@ public class HadoopShuffleJob<T> implements AutoCloseable { * Sends map updates to remote reducers. */ private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException { - for (int i = 0; i < maps.length(); i++) { - HadoopMultimap map = maps.get(i); + for (int i = 0; i < rmtMaps.length(); i++) { + HadoopMultimap map = rmtMaps.get(i); - if (map == null || locReduceAddr.equals(reduceAddrs[i])) + if (map == null) continue; // Skip empty map and local node. if (msgs[i] == null) @@ -448,7 +456,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable { } } - close(maps); + close(locMaps); + close(rmtMaps); } /** @@ -473,7 +482,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { flushed = true; - if (maps.length() == 0) + if (totalReducerCnt == 0) return new GridFinishedFuture<>(); U.await(ioInitLatch); @@ -544,7 +553,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { case REDUCE: int reducer = taskCtx.taskInfo().taskNumber(); - HadoopMultimap m = maps.get(reducer); + HadoopMultimap m = locMaps.get(reducer); if (m != null) return m.input(taskCtx); @@ -573,11 +582,24 @@ public class HadoopShuffleJob<T> implements AutoCloseable { } /** + * Check if certain partition (reducer) is local. + * + * @param part Partition. + * @return {@code True} if local. + */ + private boolean isLocalPartition(int part) { + return locReducersCtx.get(part) != null; + } + + /** * Partitioned output. */ private class PartitionedOutput implements HadoopTaskOutput { /** */ - private final HadoopTaskOutput[] adders = new HadoopTaskOutput[maps.length()]; + private final HadoopTaskOutput[] locAdders = new HadoopTaskOutput[locMaps.length()]; + + /** */ + private final HadoopTaskOutput[] rmtAdders = new HadoopTaskOutput[rmtMaps.length()]; /** */ private HadoopPartitioner partitioner; @@ -601,23 +623,38 @@ public class HadoopShuffleJob<T> implements AutoCloseable { int part = 0; if (partitioner != null) { - part = partitioner.partition(key, val, adders.length); + part = partitioner.partition(key, val, totalReducerCnt); - if (part < 0 || part >= adders.length) + if (part < 0 || part >= totalReducerCnt) throw new IgniteCheckedException("Invalid partition: " + part); } - HadoopTaskOutput out = adders[part]; + HadoopTaskOutput out; - if (out == null) - adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx); + if (isLocalPartition(part)) { + out = locAdders[part]; + + if (out == null) + locAdders[part] = out = getOrCreateMap(locMaps, part).startAdding(taskCtx); + } + else { + out = rmtAdders[part]; + + if (out == null) + rmtAdders[part] = out = getOrCreateMap(rmtMaps, part).startAdding(taskCtx); + } out.write(key, val); } /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - for (HadoopTaskOutput adder : adders) { + for (HadoopTaskOutput adder : locAdders) { + if (adder != null) + adder.close(); + } + + for (HadoopTaskOutput adder : rmtAdders) { if (adder != null) adder.close(); }
