IGNITE-4276: Hadoop: added configurable throttle for shuffle message sending. Disabled by default.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae903c59 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae903c59 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae903c59 Branch: refs/heads/master Commit: ae903c59c10bb9fad58842221f3e720d1b4d54b8 Parents: bf1770b Author: devozerov <[email protected]> Authored: Mon Dec 5 14:57:25 2016 +0300 Committer: devozerov <[email protected]> Committed: Thu Dec 15 13:44:58 2016 +0300 ---------------------------------------------------------------------- .../internal/processors/hadoop/HadoopJobProperty.java | 10 +++++++++- .../processors/hadoop/shuffle/HadoopShuffleJob.java | 9 ++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ae903c59/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java index dcfbcba..e7bf565 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java @@ -81,7 +81,15 @@ public enum HadoopJobProperty { * <p> * By default is {@code false}. */ - SHUFFLE_REDUCER_NO_SORTING; + SHUFFLE_REDUCER_NO_SORTING, + + /** + * Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter + * controls sleep duration between iterations through intermediate reducer maps. + * <p> + * Defaults to {@code 0}. + */ + SHUFFLE_JOB_THROTTLE; /** */ private final String ptyName; http://git-wip-us.apache.org/repos/asf/ignite/blob/ae903c59/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index b940c72..8c731c0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -55,6 +55,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.thread.IgniteThread; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; @@ -108,6 +109,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable { /** */ private final IgniteLogger log; + /** */ + private final long throttle; + /** * @param locReduceAddr Local reducer address. * @param log Logger. @@ -136,6 +140,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable { maps = new AtomicReferenceArray<>(totalReducerCnt); msgs = new HadoopShuffleMessage[totalReducerCnt]; + + throttle = get(job.info(), SHUFFLE_JOB_THROTTLE, 0); } /** @@ -175,7 +181,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable { @Override protected void body() throws InterruptedException { try { while (!isCancelled()) { - Thread.sleep(5); + if (throttle > 0) + Thread.sleep(throttle); collectUpdatesAndSend(false); }
