Repository: incubator-reef
Updated Branches:
  refs/heads/master cb06c5120 -> bd9326aab


[REEF 715] Allow to create GroupCommDrivers with different fanOuts

This change adds a method to the public interface of GroupCommDriver
to allow creation of groups with different fan outs.
This change is needed when we have multiple communication groups
and we want to specify different topologies among them.

JIRA:
  [REEF-715](https://issues.apache.org/jira/browse/REEF-715)

Pull request:
  This closes #464


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/bd9326aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/bd9326aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/bd9326aa

Branch: refs/heads/master
Commit: bd9326aabf3c3b955d4063906a3614bff7587d38
Parents: cb06c51
Author: Ignacio Cano <nachoac...@gmail.com>
Authored: Thu Sep 3 14:02:08 2015 -0700
Committer: Markus Weimer <wei...@apache.org>
Committed: Thu Sep 3 18:18:54 2015 -0700

----------------------------------------------------------------------
 .../io/network/group/api/driver/GroupCommDriver.java   | 13 +++++++++++++
 .../network/group/impl/driver/GroupCommDriverImpl.java |  8 +++++++-
 2 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bd9326aa/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
index f8ca101..d984c24 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
@@ -47,6 +47,19 @@ public interface GroupCommDriver {
   CommunicationGroupDriver newCommunicationGroup(Class<? extends Name<String>> 
groupName, int numberOfTasks);
 
   /**
+   * Create a new communication group with the specified name,
+   * the minimum number of tasks needed in this group before
+   * communication can start, and a custom fanOut.
+   *
+   * @param groupName
+   * @param numberOfTasks
+   * @param customFanOut
+   * @return
+   */
+  CommunicationGroupDriver newCommunicationGroup(Class<? extends Name<String>> 
groupName, int numberOfTasks,
+      int customFanOut);
+
+  /**
    * Tests whether the activeContext is a context configured.
    * using the Group Communication Service
    *

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bd9326aa/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
index c9de6e8..9f3b083 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
@@ -181,6 +181,12 @@ public class GroupCommDriverImpl implements 
GroupCommServiceDriver {
   @Override
   public CommunicationGroupDriver newCommunicationGroup(final Class<? extends 
Name<String>> groupName,
                                                         final int 
numberOfTasks) {
+    return newCommunicationGroup(groupName, numberOfTasks, fanOut);
+  }
+
+  @Override
+  public CommunicationGroupDriver newCommunicationGroup(final Class<? extends 
Name<String>> groupName,
+                                                        final int 
numberOfTasks, final int customFanOut) {
     LOG.entering("GroupCommDriverImpl", "newCommunicationGroup",
         new Object[]{Utils.simpleName(groupName), numberOfTasks});
     final BroadcastingEventHandler<RunningTask> commGroupRunningTaskHandler = 
new BroadcastingEventHandler<>();
@@ -194,7 +200,7 @@ public class GroupCommDriverImpl implements 
GroupCommServiceDriver {
         commGroupFailedTaskHandler,
         commGroupFailedEvaluatorHandler,
         commGroupMessageHandler,
-        driverId, numberOfTasks, fanOut);
+        driverId, numberOfTasks, customFanOut);
     commGroupDrivers.put(groupName, commGroupDriver);
     groupCommRunningTaskHandler.addHandler(commGroupRunningTaskHandler);
     groupCommFailedTaskHandler.addHandler(commGroupFailedTaskHandler);

Reply via email to