[REEF-126]: Merge Enhanced Features into Shimoga

This addressed the issue by
  * adding Gather and Scatter operations and the corresponding configurations
  * modifying the OperatorTopology classes to support Gather and Scatter
  * changing the TopologyUpdate message so that
    the Driver sends the current topology info for Scatter

JIRA: [REEF-126](https://issues.apache.org/jira/browse/REEF-126)

Pull request:
  This closes #226


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/05792696
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/05792696
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/05792696

Branch: refs/heads/master
Commit: 05792696abfe2ad4e5913fbd0af3e1ad78c0d1d6
Parents: 8641366
Author: Jason (Joo Seong) Jeong <[email protected]>
Authored: Tue Jun 16 20:48:37 2015 +0900
Committer: Brian Cho <[email protected]>
Committed: Sun Aug 2 18:14:29 2015 +0900

----------------------------------------------------------------------
 lang/java/reef-io/pom.xml                       |   4 +
 .../api/driver/CommunicationGroupDriver.java    |  20 ++
 .../io/network/group/api/driver/TaskNode.java   |   2 +
 .../io/network/group/api/operators/Gather.java  |   5 +
 .../io/network/group/api/operators/Scatter.java |   5 +
 .../api/task/CommunicationGroupClient.java      |  52 +++++
 .../group/api/task/OperatorTopology.java        |   9 +-
 .../group/api/task/OperatorTopologyStruct.java  |   7 +-
 .../group/impl/config/GatherOperatorSpec.java   |  82 ++++++++
 .../group/impl/config/ScatterOperatorSpec.java  |  83 ++++++++
 .../driver/CommunicationGroupDriverImpl.java    |  38 ++++
 .../network/group/impl/driver/FlatTopology.java |   2 +-
 .../network/group/impl/driver/TaskNodeImpl.java |   7 +
 .../group/impl/driver/TopologySerializer.java   | 104 ++++++++++
 .../group/impl/driver/TopologySimpleNode.java   |  44 ++++
 .../impl/driver/TopologyUpdateWaitHandler.java  |   6 +-
 .../network/group/impl/driver/TreeTopology.java |  23 ++-
 .../group/impl/operators/BroadcastReceiver.java |   2 +-
 .../group/impl/operators/GatherReceiver.java    | 192 ++++++++++++++++++
 .../group/impl/operators/GatherSender.java      | 156 ++++++++++++++
 .../group/impl/operators/ScatterReceiver.java   | 164 +++++++++++++++
 .../group/impl/operators/ScatterSender.java     | 203 +++++++++++++++++++
 .../impl/task/CommunicationGroupClientImpl.java | 114 ++++++++++-
 .../group/impl/task/OperatorTopologyImpl.java   |  28 ++-
 .../impl/task/OperatorTopologyStructImpl.java   | 147 ++++++++++----
 .../network/group/impl/utils/ScatterData.java   |  56 +++++
 .../group/impl/utils/ScatterDecoder.java        |  66 ++++++
 .../group/impl/utils/ScatterEncoder.java        | 147 ++++++++++++++
 .../network/group/impl/utils/ScatterHelper.java |  59 ++++++
 .../network/group/TopologySerializerTest.java   |  72 +++++++
 .../group/impl/utils/ScatterCodecTest.java      |  93 +++++++++
 .../group/impl/utils/ScatterHelperTest.java     |  56 +++++
 .../network/group/impl/utils/package-info.java  |  22 ++
 33 files changed, 2017 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/pom.xml b/lang/java/reef-io/pom.xml
index 6732b67..279cedf 100644
--- a/lang/java/reef-io/pom.xml
+++ b/lang/java/reef-io/pom.xml
@@ -143,6 +143,10 @@ under the License.
             <version>${hadoop.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+        </dependency>
         <!-- END OF HADOOP -->
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
index 3c5bfa2..9d65355 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
@@ -20,7 +20,9 @@ package org.apache.reef.io.network.group.api.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec;
 import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.annotations.Name;
 
@@ -56,6 +58,24 @@ public interface CommunicationGroupDriver {
   CommunicationGroupDriver addReduce(Class<? extends Name<String>> 
operatorName, ReduceOperatorSpec spec);
 
   /**
+   * Add the scatter operator specified by {@code operatorName} and {@code 
spec}.
+   *
+   * @param operatorName
+   * @param spec
+   * @return
+   */
+  CommunicationGroupDriver addScatter(Class<? extends Name<String>> 
operatorName, ScatterOperatorSpec spec);
+
+  /**
+   * Add the gather operator specified by {@code operatorName} and {@code 
spec}.
+   *
+   * @param operatorName
+   * @param spec
+   * @return
+   */
+  CommunicationGroupDriver addGather(Class<? extends Name<String>> 
operatorName, GatherOperatorSpec spec);
+
+  /**
    * This signals to the service that no more.
    * operator specs will be added to this communication
    * group and an attempt to do that will throw an

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
index 8a626cf..e2bb1fd 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
@@ -38,6 +38,8 @@ public interface TaskNode {
 
   void setParent(TaskNode parent);
 
+  Iterable<TaskNode> getChildren();
+
   void addChild(TaskNode child);
 
   void removeChild(TaskNode taskNode);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
index dc356d7..bd55629 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
@@ -19,6 +19,9 @@
 package org.apache.reef.io.network.group.api.operators;
 
 import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.group.impl.operators.GatherReceiver;
+import org.apache.reef.io.network.group.impl.operators.GatherSender;
+import org.apache.reef.tang.annotations.DefaultImplementation;
 import org.apache.reef.wake.Identifier;
 
 import java.util.List;
@@ -34,6 +37,7 @@ public interface Gather {
   /**
    * Senders or non-roots.
    */
+  @DefaultImplementation(GatherSender.class)
   interface Sender<T> extends GroupCommOperator {
 
     /**
@@ -45,6 +49,7 @@ public interface Gather {
   /**
    * Receiver or Root.
    */
+  @DefaultImplementation(GatherReceiver.class)
   interface Receiver<T> extends GroupCommOperator {
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
index 5dba002..5ded0a0 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Scatter.java
@@ -19,6 +19,9 @@
 package org.apache.reef.io.network.group.api.operators;
 
 import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.group.impl.operators.ScatterReceiver;
+import org.apache.reef.io.network.group.impl.operators.ScatterSender;
+import org.apache.reef.tang.annotations.DefaultImplementation;
 import org.apache.reef.wake.Identifier;
 
 import java.util.List;
@@ -35,6 +38,7 @@ public interface Scatter {
   /**
    * Sender or Root.
    */
+  @DefaultImplementation(ScatterSender.class)
   interface Sender<T> extends GroupCommOperator {
 
     /**
@@ -63,6 +67,7 @@ public interface Scatter {
   /**
    * Receiver or non-roots.
    */
+  @DefaultImplementation(ScatterReceiver.class)
   interface Receiver<T> extends GroupCommOperator {
     /**
      * Receive the sub-list of elements targeted for the current receiver.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
index 22314ad..4aa5217 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/CommunicationGroupClient.java
@@ -20,11 +20,17 @@ package org.apache.reef.io.network.group.api.task;
 
 import org.apache.reef.annotations.audience.TaskSide;
 import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.group.api.operators.Gather;
 import org.apache.reef.io.network.group.api.operators.Reduce;
 import org.apache.reef.io.network.group.api.GroupChanges;
+import org.apache.reef.io.network.group.api.operators.Scatter;
+import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode;
 import org.apache.reef.io.network.group.impl.task.CommunicationGroupClientImpl;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.Identifier;
+
+import java.util.List;
 
 /**
  * The Task side interface of a communication group.
@@ -78,12 +84,58 @@ public interface CommunicationGroupClient {
   Reduce.Sender getReduceSender(Class<? extends Name<String>> operatorName);
 
   /**
+   * Return the scatter sender configured on this communication group.
+   * {@code operatorName} is used to specify the scatter sender to return.
+   *
+   * @param operatorName
+   * @return
+   */
+  Scatter.Sender getScatterSender(Class<? extends Name<String>> operatorName);
+
+  /**
+   * Return the scatter receiver configured on this communication group.
+   * {@code operatorName} is used to specify the scatter receiver to return.
+   *
+   * @param operatorName
+   * @return
+   */
+  Scatter.Receiver getScatterReceiver(Class<? extends Name<String>> 
operatorName);
+
+  /**
+   * Return the gather receiver configured on this communication group.
+   * {@code operatorName} is used to specify the gather receiver to return.
+   *
+   * @param operatorName
+   * @return
+   */
+  Gather.Receiver getGatherReceiver(Class<? extends Name<String>> 
operatorName);
+
+  /**
+   * Return the gather sender configured on this communication group.
+   * {@code operatorName} is used to specify the gather sender to return.
+   *
+   * @param operatorName
+   * @return
+   */
+  Gather.Sender getGatherSender(Class<? extends Name<String>> operatorName);
+
+  /**
    * @return Changes in topology of this communication group since the last 
time
    * this method was called
    */
   GroupChanges getTopologyChanges();
 
   /**
+   * @return list of current active tasks, last updated during updateTopology()
+   */
+  List<Identifier> getActiveSlaveTasks();
+
+  /**
+   * @return root node of simplified topology representation
+   */
+  TopologySimpleNode getTopologySimpleNodeRoot();
+
+  /**
    * Asks the driver to update the topology of this communication group. This 
can
    * be an expensive call depending on what the minimum number of tasks is for 
this
    * group to function as this first tells the driver, driver then tells the 
affected

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
index 0b55259..7bc181b 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopology.java
@@ -24,6 +24,8 @@ import 
org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
 import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
 import org.apache.reef.io.serialization.Codec;
 
+import java.util.Map;
+
 /**
  * Represents the local topology of tasks for an operator. It
  * provides methods to send/rcv from parents & children
@@ -48,11 +50,16 @@ public interface OperatorTopology {
 
   void sendToParent(byte[] encode, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type reduce) throws 
ParentDeadException;
 
-  byte[] recvFromParent() throws ParentDeadException;
+  byte[] recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type 
msgType) throws ParentDeadException;
 
   void sendToChildren(byte[] data, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws 
ParentDeadException;
 
+  void sendToChildren(Map<String, byte[]> dataMap,
+                      ReefNetworkGroupCommProtos.GroupCommMessage.Type 
msgType) throws ParentDeadException;
+
   <T> T recvFromChildren(ReduceFunction<T> redFunc, Codec<T> dataCodec) throws 
ParentDeadException;
 
+  byte[] recvFromChildren() throws ParentDeadException;
+
   void initialize() throws ParentDeadException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
index 969134d..4d659a3 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/OperatorTopologyStruct.java
@@ -26,6 +26,7 @@ import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.tang.annotations.Name;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -65,9 +66,13 @@ public interface OperatorTopologyStruct {
 
   void sendToParent(byte[] data, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType);
 
-  byte[] recvFromParent();
+  byte[] recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type 
msgType);
 
   void sendToChildren(byte[] data, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType);
 
+  void sendToChildren(Map<String, byte[]> dataMap, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType);
+
   <T> T recvFromChildren(ReduceFunction<T> redFunc, Codec<T> dataCodec);
+
+  byte[] recvFromChildren();
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/GatherOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/GatherOperatorSpec.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/GatherOperatorSpec.java
new file mode 100644
index 0000000..8eb2a90
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/GatherOperatorSpec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config;
+
+import org.apache.reef.io.network.group.api.config.OperatorSpec;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.serialization.Codec;
+
+/**
+ * The specification for the Gather operator.
+ */
+public class GatherOperatorSpec implements OperatorSpec {
+
+  private final String receiverId;
+  private final Class<? extends Codec> dataCodecClass;
+
+  public GatherOperatorSpec(final String receiverId,
+                            final Class<? extends Codec> dataCodecClass) {
+    this.receiverId = receiverId;
+    this.dataCodecClass = dataCodecClass;
+  }
+
+  public String getReceiverId() {
+    return receiverId;
+  }
+
+  @Override
+  public Class<? extends Codec> getDataCodecClass() {
+    return dataCodecClass;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("Gather Operator Spec: 
[receiver=")
+        .append(receiverId)
+        .append("] [dataCodecClass=")
+        .append(Utils.simpleName(dataCodecClass))
+        .append("]");
+    return sb.toString();
+  }
+
+  public static Builder newBuilder() {
+    return new GatherOperatorSpec.Builder();
+  }
+
+  public static class Builder implements 
org.apache.reef.util.Builder<GatherOperatorSpec> {
+
+    private String receiverId;
+    private Class<? extends Codec> dataCodecClass;
+
+    public Builder setReceiverId(final String receiverId) {
+      this.receiverId = receiverId;
+      return this;
+    }
+
+    public Builder setDataCodecClass(final Class<? extends Codec> 
dataCodecClass) {
+      this.dataCodecClass = dataCodecClass;
+      return this;
+    }
+
+    @Override
+    public GatherOperatorSpec build() {
+      return new GatherOperatorSpec(receiverId, dataCodecClass);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ScatterOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ScatterOperatorSpec.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ScatterOperatorSpec.java
new file mode 100644
index 0000000..fadff14
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/config/ScatterOperatorSpec.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.config;
+
+import org.apache.reef.io.network.group.api.config.OperatorSpec;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.serialization.Codec;
+
+/**
+ * The specification for the Scatter operator.
+ */
+public class ScatterOperatorSpec implements OperatorSpec {
+
+  private final String senderId;
+  private final Class<? extends Codec> dataCodecClass;
+
+  public ScatterOperatorSpec(final String senderId,
+                             final Class<? extends Codec> dataCodecClass) {
+    this.senderId = senderId;
+    this.dataCodecClass = dataCodecClass;
+  }
+
+  public String getSenderId() {
+    return senderId;
+  }
+
+  @Override
+  public Class<? extends Codec> getDataCodecClass() {
+    return dataCodecClass;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("Scatter Operator Spec: 
[sender=")
+        .append(senderId)
+        .append("] [dataCodecClass=")
+        .append(Utils.simpleName(dataCodecClass))
+        .append("]");
+    return sb.toString();
+  }
+
+  public static Builder newBuilder() {
+    return new ScatterOperatorSpec.Builder();
+  }
+
+  public static class Builder implements 
org.apache.reef.util.Builder<ScatterOperatorSpec> {
+
+    private String senderId;
+    private Class<? extends Codec> dataCodecClass;
+
+    public Builder setSenderId(final String senderId) {
+      this.senderId = senderId;
+      return this;
+    }
+
+    public Builder setDataCodecClass(final Class<? extends Codec> 
dataCodecClass) {
+      this.dataCodecClass = dataCodecClass;
+      return this;
+    }
+
+    @Override
+    public ScatterOperatorSpec build() {
+      return new ScatterOperatorSpec(senderId, dataCodecClass);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
index ffc0f20..657aec8 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
@@ -30,7 +30,9 @@ import 
org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver;
 import org.apache.reef.io.network.group.api.driver.Topology;
 import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
 import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec;
 import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec;
 import 
org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
 import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
 import 
org.apache.reef.io.network.group.impl.config.parameters.SerializedOperConfigs;
@@ -148,6 +150,42 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
   }
 
   @Override
+  public CommunicationGroupDriver addScatter(final Class<? extends 
Name<String>> operatorName,
+                                             final ScatterOperatorSpec spec) {
+    LOG.entering("CommunicationGroupDriverImpl", "addScatter",
+        new Object[]{getQualifiedName(), Utils.simpleName(operatorName), 
spec});
+    if (finalised) {
+      throw new IllegalStateException("Can't add more operators to a finalised 
spec");
+    }
+    operatorSpecs.put(operatorName, spec);
+    final Topology topology = new TreeTopology(senderStage, groupName, 
operatorName, driverId, numberOfTasks, fanOut);
+    topology.setRootTask(spec.getSenderId());
+    topology.setOperatorSpecification(spec);
+    topologies.put(operatorName, topology);
+    LOG.exiting("CommunicationGroupDriverImpl", "addScatter",
+        Arrays.toString(new Object[]{getQualifiedName(), 
Utils.simpleName(operatorName), spec}));
+    return this;
+  }
+
+  @Override
+  public CommunicationGroupDriver addGather(final Class<? extends 
Name<String>> operatorName,
+                                            final GatherOperatorSpec spec) {
+    LOG.entering("CommunicationGroupDriverImpl", "addGather",
+        new Object[]{getQualifiedName(), Utils.simpleName(operatorName), 
spec});
+    if (finalised) {
+      throw new IllegalStateException("Can't add more operators to a finalised 
spec");
+    }
+    operatorSpecs.put(operatorName, spec);
+    final Topology topology = new TreeTopology(senderStage, groupName, 
operatorName, driverId, numberOfTasks, fanOut);
+    topology.setRootTask(spec.getReceiverId());
+    topology.setOperatorSpecification(spec);
+    topologies.put(operatorName, topology);
+    LOG.exiting("CommunicationGroupDriverImpl", "addGather",
+        Arrays.toString(new Object[]{getQualifiedName(), 
Utils.simpleName(operatorName), spec}));
+    return this;
+  }
+
+  @Override
   public Configuration getTaskConfiguration(final Configuration taskConf) {
     LOG.entering("CommunicationGroupDriverImpl", "getTaskConfiguration",
         new Object[]{getQualifiedName(), confSerializer.toString(taskConf)});

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
index 46d2345..a2da654 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
@@ -267,7 +267,7 @@ public class FlatTopology implements Topology {
     final EventHandler<List<TaskNode>> topoUpdateWaitHandler = new 
TopologyUpdateWaitHandler(senderStage, groupName,
         operName, driverId, 0,
         dstId, version,
-        getQualifiedName());
+        getQualifiedName(), TopologySerializer.encode(root));
     final EStage<List<TaskNode>> nodeTopologyUpdateWaitStage = new 
SingleThreadStage<>("NodeTopologyUpdateWaitStage",
         topoUpdateWaitHandler,
         nodes.size());

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
index f2c465c..7b68682 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
@@ -303,6 +303,13 @@ public class TaskNodeImpl implements TaskNode {
     return parent;
   }
 
+  @Override
+  public Iterable<TaskNode> getChildren() {
+    LOG.entering("TaskNodeImpl", "getChildren", getQualifiedName());
+    LOG.exiting("TaskNodeImpl", "getChildren", getQualifiedName() + children);
+    return children;
+  }
+
   private String getQualifiedName() {
     return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + 
":(" + taskId + "," + getVersion() + ") - ";
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySerializer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySerializer.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySerializer.java
new file mode 100644
index 0000000..f019703
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySerializer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import org.apache.reef.io.network.group.api.driver.TaskNode;
+import org.apache.reef.io.network.util.Pair;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+
+import java.io.*;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Utility class for encoding a Topology into a byte array and vice versa.
+ */
+public final class TopologySerializer {
+
+  /**
+   * Shouldn't be instantiated.
+   */
+  private TopologySerializer() {
+  }
+
+  /**
+   * Recursively encode TaskNodes of a Topology into a byte array.
+   *
+   * @param root the root node of the subtree to encode
+   * @return encoded byte array
+   */
+  public static byte[] encode(final TaskNode root) {
+    try (final ByteArrayOutputStream bstream = new ByteArrayOutputStream();
+         final DataOutputStream dstream = new DataOutputStream(bstream)) {
+      encodeHelper(dstream, root);
+      return bstream.toByteArray();
+
+    } catch (final IOException e) {
+      throw new RuntimeException("Exception while encoding topology of " + 
root.getTaskId(), e);
+    }
+  }
+
+  private static void encodeHelper(final DataOutputStream dstream,
+                                   final TaskNode node) throws IOException {
+    dstream.writeUTF(node.getTaskId());
+    dstream.writeInt(node.getNumberOfChildren());
+    for (final TaskNode child : node.getChildren()) {
+      encodeHelper(dstream, child);
+    }
+  }
+
+  /**
+   * Recursively translate a byte array into a TopologySimpleNode and a list 
of task Identifiers.
+   *
+   * @param data encoded byte array
+   * @param ifac IdentifierFactory needed to generate Identifiers from String 
Ids
+   * @return decoded TopologySimpleNode and a lexicographically sorted list of 
task Identifiers
+   */
+  public static Pair<TopologySimpleNode, List<Identifier>> decode(
+      final byte[] data,
+      final IdentifierFactory ifac) {
+
+    try (final DataInputStream dstream = new DataInputStream(new 
ByteArrayInputStream(data))) {
+      final List<Identifier> activeSlaveTasks = new LinkedList<>();
+      final TopologySimpleNode retNode = decodeHelper(dstream, 
activeSlaveTasks, ifac);
+      return new Pair<>(retNode, activeSlaveTasks);
+
+    } catch (final IOException e) {
+      throw new RuntimeException("Exception while decoding message", e);
+    }
+  }
+
+  private static TopologySimpleNode decodeHelper(
+      final DataInputStream dstream,
+      final List<Identifier> activeSlaveTasks,
+      final IdentifierFactory ifac) throws IOException {
+    final String taskId = dstream.readUTF();
+    activeSlaveTasks.add(ifac.getNewInstance(taskId));
+    final TopologySimpleNode retNode = new TopologySimpleNode(taskId);
+
+    final int children = dstream.readInt();
+    for (int index = 0; index < children; index++) {
+      final TopologySimpleNode child = decodeHelper(dstream, activeSlaveTasks, 
ifac);
+      retNode.addChild(child);
+    }
+
+    return retNode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySimpleNode.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySimpleNode.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySimpleNode.java
new file mode 100644
index 0000000..0907a1b
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologySimpleNode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.driver;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public final class TopologySimpleNode {
+  private final String taskId;
+  private final List<TopologySimpleNode> children;
+
+  public TopologySimpleNode(final String taskId) {
+    this.taskId = taskId;
+    this.children = new LinkedList<>();
+  }
+
+  public void addChild(final TopologySimpleNode childNode) {
+    children.add(childNode);
+  }
+
+  public String getTaskId() {
+    return this.taskId;
+  }
+
+  public Iterable<TopologySimpleNode> getChildren() {
+    return this.children;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
index 709f866..1811e66 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
@@ -43,6 +43,7 @@ public class TopologyUpdateWaitHandler implements 
EventHandler<List<TaskNode>> {
   private final String dstId;
   private final int dstVersion;
   private final String qualifiedName;
+  private final byte[] data;
 
 
   /**
@@ -59,7 +60,7 @@ public class TopologyUpdateWaitHandler implements 
EventHandler<List<TaskNode>> {
                                    final Class<? extends Name<String>> 
operName,
                                    final String driverId, final int 
driverVersion,
                                    final String dstId, final int dstVersion,
-                                   final String qualifiedName) {
+                                   final String qualifiedName, final byte[] 
data) {
     super();
     this.senderStage = senderStage;
     this.groupName = groupName;
@@ -69,6 +70,7 @@ public class TopologyUpdateWaitHandler implements 
EventHandler<List<TaskNode>> {
     this.dstId = dstId;
     this.dstVersion = dstVersion;
     this.qualifiedName = qualifiedName;
+    this.data = data;
   }
 
 
@@ -90,7 +92,7 @@ public class TopologyUpdateWaitHandler implements 
EventHandler<List<TaskNode>> {
         + dstId + "," + dstVersion + ")");
     senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
         ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated, 
driverId, driverVersion, dstId,
-        dstVersion, Utils.EMPTY_BYTE_ARR));
+        dstVersion, data));
     LOG.exiting("TopologyUpdateWaitHandler", "onNext", qualifiedName);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
index 774a8f1..908316d 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
@@ -27,14 +27,13 @@ import 
org.apache.reef.io.network.group.impl.GroupChangesCodec;
 import org.apache.reef.io.network.group.impl.GroupChangesImpl;
 import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
 import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec;
 import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec;
 import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
 import 
org.apache.reef.io.network.group.impl.config.parameters.ReduceFunctionParam;
 import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
-import org.apache.reef.io.network.group.impl.operators.BroadcastReceiver;
-import org.apache.reef.io.network.group.impl.operators.BroadcastSender;
-import org.apache.reef.io.network.group.impl.operators.ReduceReceiver;
-import org.apache.reef.io.network.group.impl.operators.ReduceSender;
+import org.apache.reef.io.network.group.impl.operators.*;
 import org.apache.reef.io.network.group.impl.utils.Utils;
 import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
 import org.apache.reef.io.serialization.Codec;
@@ -138,6 +137,20 @@ public class TreeTopology implements Topology {
       } else {
         jcb.bindImplementation(GroupCommOperator.class, ReduceSender.class);
       }
+    } else if (operatorSpec instanceof ScatterOperatorSpec) {
+      final ScatterOperatorSpec scatterOperatorSpec = (ScatterOperatorSpec) 
operatorSpec;
+      if (taskId.equals(scatterOperatorSpec.getSenderId())) {
+        jcb.bindImplementation(GroupCommOperator.class, ScatterSender.class);
+      } else {
+        jcb.bindImplementation(GroupCommOperator.class, ScatterReceiver.class);
+      }
+    } else if (operatorSpec instanceof GatherOperatorSpec) {
+      final GatherOperatorSpec gatherOperatorSpec = (GatherOperatorSpec) 
operatorSpec;
+      if (taskId.equals(gatherOperatorSpec.getReceiverId())) {
+        jcb.bindImplementation(GroupCommOperator.class, GatherReceiver.class);
+      } else {
+        jcb.bindImplementation(GroupCommOperator.class, GatherSender.class);
+      }
     }
     final Configuration retConf = jcb.build();
     LOG.exiting("TreeTopology", "getTaskConfig", getQualifiedName() + 
confSer.toString(retConf));
@@ -298,7 +311,7 @@ public class TreeTopology implements Topology {
     final EventHandler<List<TaskNode>> topoUpdateWaitHandler = new 
TopologyUpdateWaitHandler(senderStage, groupName,
         operName, driverId, 0,
         dstId, version,
-        getQualifiedName());
+        getQualifiedName(), TopologySerializer.encode(root));
     final EStage<List<TaskNode>> nodeTopologyUpdateWaitStage = new 
SingleThreadStage<>("NodeTopologyUpdateWaitStage",
         topoUpdateWaitHandler,
         nodes.size());

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
index 74e9a83..5a3e426 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java
@@ -135,7 +135,7 @@ public class BroadcastReceiver<T> implements 
Broadcast.Receiver<T>, EventHandler
     LOG.fine(this + " Waiting to receive broadcast");
     final byte[] data;
     try {
-      data = topology.recvFromParent();
+      data = 
topology.recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast);
       // TODO: Should receive the identity element instead of null
       if (data == null) {
         LOG.fine(this + " Received null. Perhaps one of my ancestors is 
dead.");

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
new file mode 100644
index 0000000..4b217f6
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherReceiver.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Gather;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import 
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import 
org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+
+import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class GatherReceiver<T> implements Gather.Receiver<T>, 
EventHandler<GroupCommunicationMessage> {
+
+  private static final Logger LOG = 
Logger.getLogger(GatherReceiver.class.getName());
+
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final Codec<T> dataCodec;
+  private final OperatorTopology topology;
+  private final CommunicationGroupServiceClient commGroupClient;
+  private final AtomicBoolean init = new AtomicBoolean(false);
+  private final int version;
+
+  @Inject
+  public GatherReceiver(@Parameter(CommunicationGroupName.class) final String 
groupName,
+                        @Parameter(OperatorName.class) final String operName,
+                        @Parameter(TaskConfigurationOptions.Identifier.class) 
final String selfId,
+                        @Parameter(DataCodec.class) final Codec<T> dataCodec,
+                        @Parameter(DriverIdentifier.class) final String 
driverId,
+                        @Parameter(TaskVersion.class) final int version,
+                        final CommGroupNetworkHandler commGroupNetworkHandler,
+                        final NetworkService<GroupCommunicationMessage> 
netService,
+                        final CommunicationGroupServiceClient commGroupClient) 
{
+    LOG.finest(operName + " has CommGroupHandler-" + 
commGroupNetworkHandler.toString());
+    this.version = version;
+    this.groupName = Utils.getClass(groupName);
+    this.operName = Utils.getClass(operName);
+    this.dataCodec = dataCodec;
+    this.topology = new OperatorTopologyImpl(this.groupName, this.operName,
+                                             selfId, driverId, new 
Sender(netService), version);
+    this.commGroupClient = commGroupClient;
+    commGroupNetworkHandler.register(this.operName, this);
+  }
+
+  @Override
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public void initialize() throws ParentDeadException {
+    topology.initialize();
+  }
+
+  @Override
+  public Class<? extends Name<String>> getOperName() {
+    return operName;
+  }
+
+  @Override
+  public Class<? extends Name<String>> getGroupName() {
+    return groupName;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("GatherReceiver:")
+        .append(Utils.simpleName(groupName))
+        .append(":")
+        .append(Utils.simpleName(operName))
+        .append(":")
+        .append(version);
+    return sb.toString();
+  }
+
+  @Override
+  public void onNext(final GroupCommunicationMessage msg) {
+    topology.handle(msg);
+  }
+
+  @Override
+  public List<T> receive() throws NetworkException, InterruptedException {
+    LOG.entering("GatherReceiver", "receive");
+    final Map<String, T> mapOfTaskIdToData = receiveMapOfTaskIdToData();
+
+    LOG.fine(this + " Sorting data according to lexicographical order of task 
identifiers.");
+    final TreeMap<String, T> sortedMapOfTaskIdToData = new 
TreeMap<>(mapOfTaskIdToData);
+    final List<T> retList = new LinkedList<>(sortedMapOfTaskIdToData.values());
+
+    LOG.exiting("GatherReceiver", "receive", retList);
+    return retList;
+  }
+
+  @Override
+  public List<T> receive(final List<? extends Identifier> order) throws 
NetworkException, InterruptedException {
+    LOG.entering("GatherReceiver", "receive", order);
+    final Map<String, T> mapOfTaskIdToData = receiveMapOfTaskIdToData();
+
+    LOG.fine(this + " Sorting data according to specified order of task 
identifiers.");
+    final List<T> retList = new LinkedList<>();
+    for (final Identifier key : order) {
+      final String keyString = key.toString();
+      if (mapOfTaskIdToData.containsKey(keyString)) {
+        retList.add(mapOfTaskIdToData.get(key.toString()));
+      } else {
+        LOG.warning(this + " Received no data from " + keyString + ". Adding 
null.");
+        retList.add(null);
+      }
+    }
+
+    LOG.exiting("GatherReceiver", "receive", retList);
+    return retList;
+  }
+
+  private Map<String, T> receiveMapOfTaskIdToData() {
+    LOG.entering("GatherReceiver", "receiveMapOfTaskIdToData");
+    // I am root.
+    LOG.fine("I am " + this);
+
+    if (init.compareAndSet(false, true)) {
+      LOG.fine(this + " Communication group initializing.");
+      commGroupClient.initialize();
+      LOG.fine(this + " Communication group initialized.");
+    }
+
+    final Map<String, T> mapOfTaskIdToData = new HashMap<>();
+    try {
+      LOG.fine(this + " Waiting for children.");
+      final byte[] gatheredDataFromChildren = topology.recvFromChildren();
+
+      LOG.fine("Using " + dataCodec.getClass().getSimpleName() + " as codec.");
+      try (final ByteArrayInputStream bstream = new 
ByteArrayInputStream(gatheredDataFromChildren);
+           final DataInputStream dstream = new DataInputStream(bstream)) {
+        while (dstream.available() > 0) {
+          final String identifier = dstream.readUTF();
+          final int dataLength = dstream.readInt();
+          final byte[] data = new byte[dataLength];
+          dstream.readFully(data);
+          mapOfTaskIdToData.put(identifier, dataCodec.decode(data));
+        }
+        LOG.fine(this + " Successfully received gathered data.");
+      }
+
+    } catch (final ParentDeadException e) {
+      throw new RuntimeException("ParentDeadException", e);
+    } catch (final IOException e) {
+      throw new RuntimeException("IOException", e);
+    }
+
+    LOG.exiting("GatherReceiver", "receiveMapOfTaskIdToData", 
Arrays.toString(new Object[]{mapOfTaskIdToData, this}));
+    return mapOfTaskIdToData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
new file mode 100644
index 0000000..a657b07
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/GatherSender.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Gather;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import 
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import 
org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class GatherSender<T> implements Gather.Sender<T>, 
EventHandler<GroupCommunicationMessage> {
+
+  private static final Logger LOG = 
Logger.getLogger(GatherSender.class.getName());
+
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final Codec<T> dataCodec;
+  private final NetworkService<GroupCommunicationMessage> netService;
+  private final OperatorTopology topology;
+  private final CommunicationGroupServiceClient commGroupClient;
+  private final AtomicBoolean init = new AtomicBoolean(false);
+  private final int version;
+
+  @Inject
+  public GatherSender(@Parameter(CommunicationGroupName.class) final String 
groupName,
+                      @Parameter(OperatorName.class) final String operName,
+                      @Parameter(TaskConfigurationOptions.Identifier.class) 
final String selfId,
+                      @Parameter(DataCodec.class) final Codec<T> dataCodec,
+                      @Parameter(DriverIdentifier.class) final String driverId,
+                      @Parameter(TaskVersion.class) final int version,
+                      final CommGroupNetworkHandler commGroupNetworkHandler,
+                      final NetworkService<GroupCommunicationMessage> 
netService,
+                      final CommunicationGroupServiceClient commGroupClient) {
+    LOG.finest(operName + "has CommGroupHandler-" + 
commGroupNetworkHandler.toString());
+    this.version = version;
+    this.groupName = Utils.getClass(groupName);
+    this.operName = Utils.getClass(operName);
+    this.dataCodec = dataCodec;
+    this.netService = netService;
+    this.topology = new OperatorTopologyImpl(this.groupName, this.operName,
+                                             selfId, driverId, new 
Sender(netService), version);
+    this.commGroupClient = commGroupClient;
+    commGroupNetworkHandler.register(this.operName, this);
+  }
+
+  @Override
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public void initialize() throws ParentDeadException {
+    topology.initialize();
+  }
+
+  @Override
+  public Class<? extends Name<String>> getOperName() {
+    return operName;
+  }
+
+  @Override
+  public Class<? extends Name<String>> getGroupName() {
+    return groupName;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("GatherSender:")
+        .append(Utils.simpleName(groupName))
+        .append(":")
+        .append(Utils.simpleName(operName))
+        .append(":")
+        .append(version);
+    return sb.toString();
+  }
+
+  @Override
+  public void onNext(final GroupCommunicationMessage msg) {
+    topology.handle(msg);
+  }
+
+  @Override
+  public void send(final T myData) throws NetworkException, 
InterruptedException {
+    LOG.entering("GatherSender", "send", myData);
+    // I am an intermediate node or a leaf.
+    LOG.fine("I am " + this);
+
+    if (init.compareAndSet(false, true)) {
+      LOG.fine(this + " Communication group initializing.");
+      commGroupClient.initialize();
+      LOG.fine(this + " Communication group initialized.");
+    }
+
+    try {
+      LOG.finest(this + " Waiting for children.");
+      final byte[] gatheredData = topology.recvFromChildren();
+      final byte[] encodedMyData = dataCodec.encode(myData);
+
+      try (final ByteArrayOutputStream bstream = new ByteArrayOutputStream();
+           final DataOutputStream dstream = new DataOutputStream(bstream)) {
+        dstream.writeUTF(netService.getMyId().toString());
+        dstream.writeInt(encodedMyData.length);
+        dstream.write(encodedMyData);
+        dstream.write(gatheredData);
+        final byte[] mergedData = bstream.toByteArray();
+
+        LOG.fine(this + " Sending merged value to parent.");
+        topology.sendToParent(mergedData, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Gather);
+      }
+    } catch (final ParentDeadException e) {
+      throw new RuntimeException("ParentDeadException", e);
+    } catch (final IOException e) {
+      throw new RuntimeException("IOException", e);
+    }
+    LOG.exiting("GatherSender", "send");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
new file mode 100644
index 0000000..1c814fb
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterReceiver.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Scatter;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import 
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import 
org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.ScatterData;
+import org.apache.reef.io.network.group.impl.utils.ScatterDecoder;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public final class ScatterReceiver<T> implements Scatter.Receiver<T>, 
EventHandler<GroupCommunicationMessage> {
+
+  private static final Logger LOG = 
Logger.getLogger(ScatterReceiver.class.getName());
+
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final Codec<T> dataCodec;
+  private final OperatorTopology topology;
+  private final AtomicBoolean init = new AtomicBoolean(false);
+  private final CommunicationGroupServiceClient commGroupClient;
+  private final int version;
+  private final ScatterDecoder scatterDecoder;
+
+  @Inject
+  public ScatterReceiver(@Parameter(CommunicationGroupName.class) final String 
groupName,
+                         @Parameter(OperatorName.class) final String operName,
+                         @Parameter(TaskConfigurationOptions.Identifier.class) 
final String selfId,
+                         @Parameter(DataCodec.class) final Codec<T> dataCodec,
+                         @Parameter(DriverIdentifier.class) final String 
driverId,
+                         @Parameter(TaskVersion.class) final int version,
+                         final CommGroupNetworkHandler commGroupNetworkHandler,
+                         final NetworkService<GroupCommunicationMessage> 
netService,
+                         final CommunicationGroupServiceClient commGroupClient,
+                         final ScatterDecoder scatterDecoder) {
+    LOG.finest(operName + "has CommGroupHandler-" + 
commGroupNetworkHandler.toString());
+    this.version = version;
+    this.groupName = Utils.getClass(groupName);
+    this.operName = Utils.getClass(operName);
+    this.dataCodec = dataCodec;
+    this.scatterDecoder = scatterDecoder;
+    this.topology = new OperatorTopologyImpl(this.groupName, this.operName,
+                                             selfId, driverId, new 
Sender(netService), version);
+    this.commGroupClient = commGroupClient;
+    commGroupNetworkHandler.register(this.operName, this);
+  }
+
+  @Override
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public void initialize() throws ParentDeadException {
+    topology.initialize();
+  }
+
+  @Override
+  public Class<? extends Name<String>> getOperName() {
+    return operName;
+  }
+
+  @Override
+  public Class<? extends Name<String>> getGroupName() {
+    return groupName;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("ScatterReceiver:")
+        .append(Utils.simpleName(groupName))
+        .append(":")
+        .append(Utils.simpleName(operName))
+        .append(":")
+        .append(version);
+    return sb.toString();
+  }
+
+  @Override
+  public void onNext(final GroupCommunicationMessage msg) {
+    topology.handle(msg);
+  }
+
+  @Override
+  public List<T> receive() throws NetworkException, InterruptedException {
+    LOG.entering("ScatterReceiver", "receive");
+    // I am intermediate node or leaf.
+    LOG.fine("I am " + this);
+
+    if (init.compareAndSet(false, true)) {
+      LOG.fine(this + " Communication group initializing.");
+      commGroupClient.initialize();
+      LOG.fine(this + " Communication group initialized.");
+    }
+
+    try {
+      LOG.fine(this + " Waiting to receive scatter from parent.");
+      final byte[] data = 
topology.recvFromParent(ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter);
+
+      if (data == null) {
+        LOG.fine(this + " Received null. Perhaps one of my ancestors is 
dead.");
+        LOG.exiting("ScatterSender", "receive", null);
+        return null;
+      }
+
+      LOG.fine(this + " Successfully received scattered data.");
+      final ScatterData scatterData = scatterDecoder.decode(data);
+
+      LOG.fine(this + " Trying to propagate messages to children.");
+      topology.sendToChildren(scatterData.getChildrenData(), 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter);
+
+      LOG.fine(this + " Decoding data elements sent to me.");
+      final List<T> retList = new LinkedList<>();
+      for (final byte[] singleData : scatterData.getMyData()) {
+        retList.add(dataCodec.decode(singleData));
+      }
+
+      LOG.exiting("ScatterSender", "receive", retList);
+      return retList;
+
+    } catch (final ParentDeadException e) {
+      throw new RuntimeException("ParentDeadException", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
new file mode 100644
index 0000000..bef15f4
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ScatterSender.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Scatter;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import 
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import 
org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.ScatterEncoder;
+import org.apache.reef.io.network.group.impl.utils.ScatterHelper;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public final class ScatterSender<T> implements Scatter.Sender<T>, 
EventHandler<GroupCommunicationMessage> {
+
+  private static final Logger LOG = 
Logger.getLogger(ScatterSender.class.getName());
+
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final Codec<T> dataCodec;
+  private final OperatorTopology topology;
+  private final AtomicBoolean init = new AtomicBoolean(false);
+  private final CommunicationGroupServiceClient commGroupClient;
+  private final int version;
+  private final ScatterEncoder scatterEncoder;
+
+  @Inject
+  public ScatterSender(@Parameter(CommunicationGroupName.class) final String 
groupName,
+                       @Parameter(OperatorName.class) final String operName,
+                       @Parameter(TaskConfigurationOptions.Identifier.class) 
final String selfId,
+                       @Parameter(DataCodec.class) final Codec<T> dataCodec,
+                       @Parameter(DriverIdentifier.class) final String 
driverId,
+                       @Parameter(TaskVersion.class) final int version,
+                       final CommGroupNetworkHandler commGroupNetworkHandler,
+                       final NetworkService<GroupCommunicationMessage> 
netService,
+                       final CommunicationGroupServiceClient commGroupClient,
+                       final ScatterEncoder scatterEncoder) {
+    LOG.finest(operName + "has CommGroupHandler-" + 
commGroupNetworkHandler.toString());
+    this.version = version;
+    this.groupName = Utils.getClass(groupName);
+    this.operName = Utils.getClass(operName);
+    this.dataCodec = dataCodec;
+    this.scatterEncoder = scatterEncoder;
+    this.topology = new OperatorTopologyImpl(this.groupName, this.operName,
+                                             selfId, driverId, new 
Sender(netService), version);
+    this.commGroupClient = commGroupClient;
+    commGroupNetworkHandler.register(this.operName, this);
+  }
+
+  @Override
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public void initialize() throws ParentDeadException {
+    topology.initialize();
+  }
+
+  @Override
+  public Class<? extends Name<String>> getOperName() {
+    return operName;
+  }
+
+  @Override
+  public Class<? extends Name<String>> getGroupName() {
+    return groupName;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("ScatterSender:")
+        .append(Utils.simpleName(groupName))
+        .append(":")
+        .append(Utils.simpleName(operName))
+        .append(":")
+        .append(version);
+    return sb.toString();
+  }
+
+  @Override
+  public void onNext(final GroupCommunicationMessage msg) {
+    topology.handle(msg);
+  }
+
+  private void initializeGroup() {
+    if (init.compareAndSet(false, true)) {
+      LOG.fine(this + " Communication group initializing.");
+      commGroupClient.initialize();
+      LOG.fine(this + " Communication group initialized.");
+    }
+  }
+
+  @Override
+  public void send(final List<T> elements) throws NetworkException, 
InterruptedException {
+    LOG.entering("ScatterSender", "send", elements);
+
+    initializeGroup();
+    send(elements,
+        ScatterHelper.getUniformCounts(elements.size(), 
commGroupClient.getActiveSlaveTasks().size()),
+        commGroupClient.getActiveSlaveTasks());
+
+    LOG.exiting("ScatterSender", "send", elements);
+  }
+
+  @Override
+  public void send(final List<T> elements, final Integer... counts)
+      throws NetworkException, InterruptedException {
+    LOG.entering("ScatterSender", "send", new Object[]{elements, counts});
+
+    initializeGroup();
+    if (counts.length != commGroupClient.getActiveSlaveTasks().size()) {
+      throw new RuntimeException("Parameter 'counts' has length " + 
counts.length
+          + ", but number of slaves is " + 
commGroupClient.getActiveSlaveTasks().size());
+    }
+
+    send(elements,
+        Arrays.asList(counts),
+        commGroupClient.getActiveSlaveTasks());
+
+    LOG.exiting("ScatterSender", "send", Arrays.toString(new 
Object[]{elements, counts}));
+  }
+
+  @Override
+  public void send(final List<T> elements, final List<? extends Identifier> 
order)
+      throws NetworkException, InterruptedException {
+    LOG.entering("ScatterSender", "send", new Object[]{elements, order});
+
+    initializeGroup();
+    send(elements,
+        ScatterHelper.getUniformCounts(elements.size(), order.size()),
+        order);
+
+    LOG.exiting("ScatterSender", "send", Arrays.toString(new 
Object[]{elements, order}));
+  }
+
+  @Override
+  public void send(final List<T> elements, final List<Integer> counts, final 
List<? extends Identifier> order)
+      throws NetworkException, InterruptedException {
+    LOG.entering("ScatterSender", "send", new Object[]{elements, counts, 
order});
+
+    if (counts.size() != order.size()) {
+      throw new RuntimeException("Parameter 'counts' has size " + counts.size()
+          + ", but parameter 'order' has size " + order.size() + ".");
+    }
+    initializeGroup();
+
+    // I am root.
+    LOG.fine("I am " + this);
+
+    LOG.fine(this + " Encoding data and determining which Tasks receive which 
elements.");
+    final Map<String, byte[]> mapOfChildIdToBytes = 
scatterEncoder.encode(elements, counts, order, dataCodec);
+
+    try {
+      LOG.fine(this + " Sending " + elements.size() + " elements.");
+      topology.sendToChildren(mapOfChildIdToBytes, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Scatter);
+
+    } catch (final ParentDeadException e) {
+      throw new RuntimeException("ParentDeadException during 
OperatorTopology.sendToChildren()", e);
+    }
+
+    LOG.exiting("ScatterSender", "send", Arrays.toString(new Object[]{this, 
elements, counts, order}));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
index f9143d9..8482268 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
@@ -21,9 +21,9 @@ package org.apache.reef.io.network.group.impl.task;
 import org.apache.reef.driver.parameters.DriverIdentifier;
 import org.apache.reef.driver.task.TaskConfigurationOptions;
 import org.apache.reef.exception.evaluator.NetworkException;
-import org.apache.reef.io.network.group.api.operators.Broadcast;
-import org.apache.reef.io.network.group.api.operators.GroupCommOperator;
-import org.apache.reef.io.network.group.api.operators.Reduce;
+import org.apache.reef.io.network.group.api.operators.*;
+import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode;
+import org.apache.reef.io.network.group.impl.driver.TopologySerializer;
 import org.apache.reef.io.network.impl.NetworkService;
 import org.apache.reef.io.network.group.api.GroupChanges;
 import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
@@ -38,6 +38,7 @@ import 
org.apache.reef.io.network.group.impl.config.parameters.SerializedOperCon
 import org.apache.reef.io.network.group.impl.operators.Sender;
 import org.apache.reef.io.network.group.impl.utils.Utils;
 import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.network.util.Pair;
 import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Injector;
@@ -47,6 +48,8 @@ import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
 import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
 import org.apache.reef.wake.impl.ThreadPoolStage;
 
 import javax.inject.Inject;
@@ -65,6 +68,10 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
   private final Sender sender;
 
   private final String taskId;
+  private final boolean isScatterSender;
+  private final IdentifierFactory identifierFactory;
+  private List<Identifier> activeSlaveTasks;
+  private TopologySimpleNode topologySimpleNodeRoot;
 
   private final String driverId;
 
@@ -83,6 +90,7 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
     this.taskId = taskId;
     this.driverId = driverId;
     LOG.finest(groupName + " has GroupCommHandler-" + 
groupCommNetworkHandler.toString());
+    this.identifierFactory = netService.getIdentifierFactory();
     this.groupName = Utils.getClass(groupName);
     this.groupCommNetworkHandler = groupCommNetworkHandler;
     this.sender = new Sender(netService);
@@ -99,6 +107,7 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
       this.commGroupNetworkHandler = 
Tang.Factory.getTang().newInjector().getInstance(CommGroupNetworkHandler.class);
       this.groupCommNetworkHandler.register(this.groupName, 
commGroupNetworkHandler);
 
+      boolean operatorIsScatterSender = false;
       for (final String operatorConfigStr : operatorConfigs) {
 
         final Configuration operatorConfig = 
configSerializer.fromString(operatorConfigStr);
@@ -114,7 +123,13 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
         final String operName = injector.getNamedInstance(OperatorName.class);
         this.operators.put(Utils.getClass(operName), operator);
         LOG.finest(operName + " has CommGroupHandler-" + 
commGroupNetworkHandler.toString());
+
+        if (!operatorIsScatterSender && operator instanceof Scatter.Sender) {
+          LOG.fine(operName + " is a scatter sender. Will keep track of active 
slave tasks.");
+          operatorIsScatterSender = true;
+        }
       }
+      this.isScatterSender = operatorIsScatterSender;
     } catch (final InjectionException | IOException e) {
       throw new RuntimeException("Unable to deserialize operator config", e);
     }
@@ -147,6 +162,33 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
   }
 
   @Override
+  public Scatter.Sender getScatterSender(final Class<? extends Name<String>> 
operatorName) {
+    LOG.entering("CommunicationGroupClientImpl", "getScatterSender", new 
Object[]{getQualifiedName(),
+        Utils.simpleName(operatorName)});
+    final GroupCommOperator op = operators.get(operatorName);
+    if (!(op instanceof Scatter.Sender)) {
+      throw new RuntimeException("Configured operator is not a scatter 
sender");
+    }
+    commGroupNetworkHandler.addTopologyElement(operatorName);
+    LOG.exiting("CommunicationGroupClientImpl", "getScatterSender", 
getQualifiedName() + op);
+    return (Scatter.Sender) op;
+  }
+
+  @Override
+  public Gather.Receiver getGatherReceiver(final Class<? extends Name<String>> 
operatorName) {
+    LOG.entering("CommunicationGroupClientImpl", "getGatherReceiver", new 
Object[]{getQualifiedName(),
+        Utils.simpleName(operatorName)});
+    final GroupCommOperator op = operators.get(operatorName);
+    if (!(op instanceof Gather.Receiver)) {
+      throw new RuntimeException("Configured operator is not a gather 
receiver");
+    }
+    commGroupNetworkHandler.addTopologyElement(operatorName);
+    LOG.exiting("CommunicationGroupClientImpl", "getGatherReceiver", 
getQualifiedName() + op);
+    return (Gather.Receiver) op;
+  }
+
+
+  @Override
   public Broadcast.Receiver getBroadcastReceiver(final Class<? extends 
Name<String>> operatorName) {
     LOG.entering("CommunicationGroupClientImpl", "getBroadcastReceiver", new 
Object[]{getQualifiedName(),
         Utils.simpleName(operatorName)});
@@ -173,6 +215,32 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
   }
 
   @Override
+  public Scatter.Receiver getScatterReceiver(final Class<? extends 
Name<String>> operatorName) {
+    LOG.entering("CommunicationGroupClientImpl", "getScatterReceiver", new 
Object[]{getQualifiedName(),
+        Utils.simpleName(operatorName)});
+    final GroupCommOperator op = operators.get(operatorName);
+    if (!(op instanceof Scatter.Receiver)) {
+      throw new RuntimeException("Configured operator is not a scatter 
receiver");
+    }
+    commGroupNetworkHandler.addTopologyElement(operatorName);
+    LOG.exiting("CommunicationGroupClientImpl", "getScatterReceiver", 
getQualifiedName() + op);
+    return (Scatter.Receiver) op;
+  }
+
+  @Override
+  public Gather.Sender getGatherSender(final Class<? extends Name<String>> 
operatorName) {
+    LOG.entering("CommunicationGroupClientImpl", "getGatherSender", new 
Object[]{getQualifiedName(),
+        Utils.simpleName(operatorName)});
+    final GroupCommOperator op = operators.get(operatorName);
+    if (!(op instanceof Gather.Sender)) {
+      throw new RuntimeException("Configured operator is not a gather sender");
+    }
+    commGroupNetworkHandler.addTopologyElement(operatorName);
+    LOG.exiting("CommunicationGroupClientImpl", "getGatherSender", 
getQualifiedName() + op);
+    return (Gather.Sender) op;
+  }
+
+  @Override
   public void initialize() {
     LOG.entering("CommunicationGroupClientImpl", "initialize", 
getQualifiedName());
     if (init.compareAndSet(false, true)) {
@@ -191,6 +259,10 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
         throw new RuntimeException("InterruptedException while waiting for 
initialization", e);
       }
 
+      if (isScatterSender) {
+        updateTopology();
+      }
+
       if (initHandler.getException() != null) {
         throw new RuntimeException(getQualifiedName() + "Parent dead. Current 
behavior is for the child to die too.");
       }
@@ -261,10 +333,36 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
       do {
         msg = commGroupNetworkHandler.waitForTopologyUpdate(operName);
       } while (!isMsgVersionOk(msg));
+
+      if (isScatterSender) {
+        updateActiveTasks(msg);
+      }
     }
     LOG.exiting("CommunicationGroupClientImpl", "updateTopology", 
getQualifiedName());
   }
 
+  private void updateActiveTasks(final GroupCommunicationMessage msg) {
+    LOG.entering("CommunicationGroupClientImpl", "updateActiveTasks", new 
Object[]{getQualifiedName(), msg});
+
+    final Pair<TopologySimpleNode, List<Identifier>> pair =
+        TopologySerializer.decode(msg.getData()[0], identifierFactory);
+
+    topologySimpleNodeRoot = pair.getFirst();
+
+    activeSlaveTasks = pair.getSecond();
+    // remove myself
+    activeSlaveTasks.remove(identifierFactory.getNewInstance(taskId));
+    // sort the tasks in lexicographical order on task ids
+    Collections.sort(activeSlaveTasks, new Comparator<Identifier>() {
+      @Override
+      public int compare(final Identifier o1, final Identifier o2) {
+        return o1.toString().compareTo(o2.toString());
+      }
+    });
+
+    LOG.exiting("CommunicationGroupClientImpl", "updateActiveTasks", new 
Object[]{getQualifiedName(), msg});
+  }
+
   private boolean isMsgVersionOk(final GroupCommunicationMessage msg) {
     LOG.entering("CommunicationGroupClientImpl", "isMsgVersionOk", new 
Object[]{getQualifiedName(), msg});
     if (msg.hasVersion()) {
@@ -287,6 +385,16 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
     }
   }
 
+  @Override
+  public List<Identifier> getActiveSlaveTasks() {
+    return this.activeSlaveTasks;
+  }
+
+  @Override
+  public TopologySimpleNode getTopologySimpleNodeRoot() {
+    return this.topologySimpleNodeRoot;
+  }
+
   private String getQualifiedName() {
     return Utils.simpleName(groupName) + " ";
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
index 308c33c..d64fa5c 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
@@ -37,6 +37,7 @@ import org.apache.reef.wake.impl.SingleThreadStage;
 import javax.inject.Inject;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -218,11 +219,24 @@ public class OperatorTopologyImpl implements 
OperatorTopology {
   }
 
   @Override
-  public byte[] recvFromParent() throws ParentDeadException {
+  public void sendToChildren(final Map<String, byte[]> dataMap,
+                             final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType)
+      throws ParentDeadException {
+    LOG.entering("OperatorTopologyImpl", "sendToChildren", new 
Object[]{getQualifiedName(), dataMap, msgType});
+    refreshEffectiveTopology();
+    assert (effectiveTopology != null);
+    effectiveTopology.sendToChildren(dataMap, msgType);
+    LOG.exiting("OperatorTopologyImpl", "sendToChildren",
+        Arrays.toString(new Object[]{getQualifiedName(), dataMap, msgType}));
+  }
+
+  @Override
+  public byte[] recvFromParent(final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType)
+      throws ParentDeadException {
     LOG.entering("OperatorTopologyImpl", "recvFromParent", getQualifiedName());
     refreshEffectiveTopology();
     assert (effectiveTopology != null);
-    final byte[] retVal = effectiveTopology.recvFromParent();
+    final byte[] retVal = effectiveTopology.recvFromParent(msgType);
     LOG.exiting("OperatorTopologyImpl", "recvFromParent", Arrays.toString(new 
Object[]{getQualifiedName(), retVal}));
     return retVal;
   }
@@ -238,6 +252,16 @@ public class OperatorTopologyImpl implements 
OperatorTopology {
     return retVal;
   }
 
+  @Override
+  public byte[] recvFromChildren() throws ParentDeadException {
+    LOG.entering("OperatorTopologyImpl", "recvFromChildren", 
getQualifiedName());
+    refreshEffectiveTopology();
+    assert (effectiveTopology != null);
+    final byte[] retVal = effectiveTopology.recvFromChildren();
+    LOG.exiting("OperatorTopologyImpl", "recvFromChildren", 
Arrays.toString(new Object[]{getQualifiedName(), retVal}));
+    return retVal;
+  }
+
   /**
    * Only refreshes the effective topology with deletion msgs from.
    * deletionDeltas queue


Reply via email to