IGNITE-4355: Hadoop: Implemented parallel task context initialization during shuffle. This closes #1310. This closes #1313.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04cff9b5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04cff9b5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04cff9b5 Branch: refs/heads/master Commit: 04cff9b53df1da21c7552b2e47df50cacbae9158 Parents: b7b97cf Author: devozerov <[email protected]> Authored: Mon Dec 5 16:28:05 2016 +0300 Committer: devozerov <[email protected]> Committed: Thu Dec 15 13:45:42 2016 +0300 ---------------------------------------------------------------------- .../hadoop/shuffle/HadoopShuffle.java | 23 ++++++--- .../hadoop/shuffle/HadoopShuffleJob.java | 53 ++++++++++++++++++-- 2 files changed, 66 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/04cff9b5/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index a69e779..4450bf2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -53,6 +53,9 @@ public class HadoopShuffle extends HadoopComponent { /** */ protected final GridUnsafeMemory mem = new GridUnsafeMemory(0); + /** Mutex for iternal synchronization. */ + private final Object mux = new Object(); + /** {@inheritDoc} */ @Override public void start(HadoopContext ctx) throws IgniteCheckedException { super.start(ctx); @@ -141,17 +144,23 @@ public class HadoopShuffle extends HadoopComponent { HadoopShuffleJob<UUID> res = jobs.get(jobId); if (res == null) { - res = newJob(jobId); + synchronized (mux) { + res = jobs.get(jobId); + + if (res == null) { + res = newJob(jobId); - HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res); + HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res); - if (old != null) { - res.close(); + if (old != null) { + res.close(); - res = old; + res = old; + } + else if (res.reducersInitialized()) + startSending(res); + } } - else if (res.reducersInitialized()) - startSending(res); } return res; http://git-wip-us.apache.org/repos/asf/ignite/blob/04cff9b5/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index 9392b2c..aca5fdf 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -78,7 +78,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { private final boolean needPartitioner; /** Collection of task contexts for each reduce task. */ - private final Map<Integer, HadoopTaskContext> reducersCtx = new HashMap<>(); + private final Map<Integer, LocalTaskContextProxy> reducersCtx = new HashMap<>(); /** Reducers addresses. */ private T[] reduceAddrs; @@ -139,7 +139,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { for (int rdc : locReducers) { HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null); - reducersCtx.put(rdc, job.getTaskContext(taskInfo)); + reducersCtx.put(rdc, new LocalTaskContextProxy(taskInfo)); } } @@ -237,7 +237,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { assert msg.buffer() != null; assert msg.offset() > 0; - HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()); + HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()).get(); HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null); @@ -623,4 +623,51 @@ public class HadoopShuffleJob<T> implements AutoCloseable { } } } + + /** + * Local task context proxy with delayed initialization. + */ + private class LocalTaskContextProxy { + /** Mutex for synchronization. */ + private final Object mux = new Object(); + + /** Task info. */ + private final HadoopTaskInfo taskInfo; + + /** Task context. */ + private volatile HadoopTaskContext ctx; + + /** + * Constructor. + * + * @param taskInfo Task info. + */ + public LocalTaskContextProxy(HadoopTaskInfo taskInfo) { + this.taskInfo = taskInfo; + } + + /** + * Get task context. + * + * @return Task context. + * @throws IgniteCheckedException If failed. + */ + public HadoopTaskContext get() throws IgniteCheckedException { + HadoopTaskContext ctx0 = ctx; + + if (ctx0 == null) { + synchronized (mux) { + ctx0 = ctx; + + if (ctx0 == null) { + ctx0 = job.getTaskContext(taskInfo); + + ctx = ctx0; + } + } + } + + return ctx0; + } + } } \ No newline at end of file
