IGNITE-4276: Hadoop: added configurable throttle for shuffle message sending. 
Disabled by default.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae903c59
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae903c59
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae903c59

Branch: refs/heads/master
Commit: ae903c59c10bb9fad58842221f3e720d1b4d54b8
Parents: bf1770b
Author: devozerov <[email protected]>
Authored: Mon Dec 5 14:57:25 2016 +0300
Committer: devozerov <[email protected]>
Committed: Thu Dec 15 13:44:58 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/hadoop/HadoopJobProperty.java     | 10 +++++++++-
 .../processors/hadoop/shuffle/HadoopShuffleJob.java       |  9 ++++++++-
 2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ae903c59/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index dcfbcba..e7bf565 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -81,7 +81,15 @@ public enum HadoopJobProperty {
      * <p>
      * By default is {@code false}.
      */
-    SHUFFLE_REDUCER_NO_SORTING;
+    SHUFFLE_REDUCER_NO_SORTING,
+
+    /**
+     * Shuffle job throttle in milliseconds. When job is executed with 
separate shuffle thread, this parameter
+     * controls sleep duration between iterations through intermediate reducer 
maps.
+     * <p>
+     * Defaults to {@code 0}.
+     */
+    SHUFFLE_JOB_THROTTLE;
 
     /** */
     private final String ptyName;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae903c59/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index b940c72..8c731c0 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -55,6 +55,7 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.thread.IgniteThread;
 
 import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
+import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
 import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
 import static 
org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
 
@@ -108,6 +109,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     /** */
     private final IgniteLogger log;
 
+    /** */
+    private final long throttle;
+
     /**
      * @param locReduceAddr Local reducer address.
      * @param log Logger.
@@ -136,6 +140,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
 
         maps = new AtomicReferenceArray<>(totalReducerCnt);
         msgs = new HadoopShuffleMessage[totalReducerCnt];
+
+        throttle = get(job.info(), SHUFFLE_JOB_THROTTLE, 0);
     }
 
     /**
@@ -175,7 +181,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
                 @Override protected void body() throws InterruptedException {
                     try {
                         while (!isCancelled()) {
-                            Thread.sleep(5);
+                            if (throttle > 0)
+                                Thread.sleep(throttle);
 
                             collectUpdatesAndSend(false);
                         }

Reply via email to