Repository: incubator-reef Updated Branches: refs/heads/master cb06c5120 -> bd9326aab
[REEF 715] Allow to create GroupCommDrivers with different fanOuts This change adds a method to the public interface of GroupCommDriver to allow creation of groups with different fan outs. This change is needed when we have multiple communication groups and we want to specify different topologies among them. JIRA: [REEF-715](https://issues.apache.org/jira/browse/REEF-715) Pull request: This closes #464 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/bd9326aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/bd9326aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/bd9326aa Branch: refs/heads/master Commit: bd9326aabf3c3b955d4063906a3614bff7587d38 Parents: cb06c51 Author: Ignacio Cano <nachoac...@gmail.com> Authored: Thu Sep 3 14:02:08 2015 -0700 Committer: Markus Weimer <wei...@apache.org> Committed: Thu Sep 3 18:18:54 2015 -0700 ---------------------------------------------------------------------- .../io/network/group/api/driver/GroupCommDriver.java | 13 +++++++++++++ .../network/group/impl/driver/GroupCommDriverImpl.java | 8 +++++++- 2 files changed, 20 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bd9326aa/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java index f8ca101..d984c24 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java @@ -47,6 +47,19 @@ public interface GroupCommDriver { CommunicationGroupDriver newCommunicationGroup(Class<? extends Name<String>> groupName, int numberOfTasks); /** + * Create a new communication group with the specified name, + * the minimum number of tasks needed in this group before + * communication can start, and a custom fanOut. + * + * @param groupName + * @param numberOfTasks + * @param customFanOut + * @return + */ + CommunicationGroupDriver newCommunicationGroup(Class<? extends Name<String>> groupName, int numberOfTasks, + int customFanOut); + + /** * Tests whether the activeContext is a context configured. * using the Group Communication Service * http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bd9326aa/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java index c9de6e8..9f3b083 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java @@ -181,6 +181,12 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver { @Override public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName, final int numberOfTasks) { + return newCommunicationGroup(groupName, numberOfTasks, fanOut); + } + + @Override + public CommunicationGroupDriver newCommunicationGroup(final Class<? extends Name<String>> groupName, + final int numberOfTasks, final int customFanOut) { LOG.entering("GroupCommDriverImpl", "newCommunicationGroup", new Object[]{Utils.simpleName(groupName), numberOfTasks}); final BroadcastingEventHandler<RunningTask> commGroupRunningTaskHandler = new BroadcastingEventHandler<>(); @@ -194,7 +200,7 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver { commGroupFailedTaskHandler, commGroupFailedEvaluatorHandler, commGroupMessageHandler, - driverId, numberOfTasks, fanOut); + driverId, numberOfTasks, customFanOut); commGroupDrivers.put(groupName, commGroupDriver); groupCommRunningTaskHandler.addHandler(commGroupRunningTaskHandler); groupCommFailedTaskHandler.addHandler(commGroupFailedTaskHandler);