IGNITE-4274: Hadoop: added new property to control shuffle message size.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2488f340 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2488f340 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2488f340 Branch: refs/heads/master Commit: 2488f340c85301c0ec39cac80a8426f5f4c3caf0 Parents: ae903c5 Author: devozerov <[email protected]> Authored: Mon Dec 5 15:28:54 2016 +0300 Committer: devozerov <[email protected]> Committed: Thu Dec 15 13:45:26 2016 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopJobProperty.java | 37 ++++++++++++-------- .../hadoop/shuffle/HadoopShuffleJob.java | 12 +++++-- 2 files changed, 32 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2488f340/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java index e7bf565..e713caa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java @@ -28,40 +28,40 @@ public enum HadoopJobProperty { * <p> * Setting it right allows to avoid rehashing. */ - COMBINER_HASHMAP_SIZE, + COMBINER_HASHMAP_SIZE("ignite.combiner.hashmap.size"), /** * Initial size for hashmap which stores output of mapper or combiner and will be used as input of reducer. * <p> * Setting it right allows to avoid rehashing. */ - PARTITION_HASHMAP_SIZE, + PARTITION_HASHMAP_SIZE("ignite.partition.hashmap.size"), /** * Specifies number of concurrently running mappers for external execution mode. * <p> * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. */ - EXTERNAL_CONCURRENT_MAPPERS, + EXTERNAL_CONCURRENT_MAPPERS("ignite.external.concurrent.mappers"), /** * Specifies number of concurrently running reducers for external execution mode. * <p> * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. */ - EXTERNAL_CONCURRENT_REDUCERS, + EXTERNAL_CONCURRENT_REDUCERS("ignite.external.concurrent.reducers"), /** * Delay in milliseconds after which Ignite server will reply job status. */ - JOB_STATUS_POLL_DELAY, + JOB_STATUS_POLL_DELAY("ignite.job.status.poll.delay"), /** * Size in bytes of single memory page which will be allocated for data structures in shuffle. * <p> * By default is {@code 32 * 1024}. */ - SHUFFLE_OFFHEAP_PAGE_SIZE, + SHUFFLE_OFFHEAP_PAGE_SIZE("ignite.shuffle.offheap.page.size"), /** * If set to {@code true} then input for combiner will not be sorted by key. @@ -71,7 +71,7 @@ public enum HadoopJobProperty { * <p> * By default is {@code false}. */ - SHUFFLE_COMBINER_NO_SORTING, + SHUFFLE_COMBINER_NO_SORTING("ignite.shuffle.combiner.no.sorting"), /** * If set to {@code true} then input for reducer will not be sorted by key. @@ -81,7 +81,14 @@ public enum HadoopJobProperty { * <p> * By default is {@code false}. */ - SHUFFLE_REDUCER_NO_SORTING, + SHUFFLE_REDUCER_NO_SORTING("ignite.shuffle.reducer.no.sorting"), + + /** + * Defines approximate size in bytes of shuffle message which will be passed over wire from mapper to reducer. + * <p> + * Defaults to 128Kb. + */ + SHUFFLE_MSG_SIZE("ignite.shuffle.message.size"), /** * Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter @@ -89,23 +96,25 @@ public enum HadoopJobProperty { * <p> * Defaults to {@code 0}. */ - SHUFFLE_JOB_THROTTLE; + SHUFFLE_JOB_THROTTLE("ignite.shuffle.job.throttle"); - /** */ - private final String ptyName; + /** Property name. */ + private final String propName; /** + * Constrcutor. * + * @param propName Property name. */ - HadoopJobProperty() { - ptyName = "ignite." + name().toLowerCase().replace('_', '.'); + HadoopJobProperty(String propName) { + this.propName = propName; } /** * @return Property name. */ public String propertyName() { - return ptyName; + return propName; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2488f340/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index 8c731c0..e5af8f1 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -55,6 +55,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.thread.IgniteThread; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; @@ -64,7 +65,7 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get */ public class HadoopShuffleJob<T> implements AutoCloseable { /** */ - private static final int MSG_BUF_SIZE = 128 * 1024; + private static final int DFLT_SHUFFLE_MSG_SIZE = 128 * 1024; /** */ private final HadoopJob job; @@ -109,6 +110,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable { /** */ private final IgniteLogger log; + /** Message size. */ + private final int msgSize; + /** */ private final long throttle; @@ -128,6 +132,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable { this.mem = mem; this.log = log.getLogger(HadoopShuffleJob.class); + msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE); + if (!F.isEmpty(locReducers)) { for (int rdc : locReducers) { HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null); @@ -320,7 +326,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { continue; // Skip empty map and local node. if (msgs[i] == null) - msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE); + msgs[i] = new HadoopShuffleMessage(job.id(), i, msgSize); final int idx = i; @@ -425,7 +431,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { }); msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx, - Math.max(MSG_BUF_SIZE, newBufMinSize)); + Math.max(msgSize, newBufMinSize)); } /** {@inheritDoc} */
