Repository: incubator-reef
Updated Branches:
  refs/heads/master 0911c0832 -> 6c6ad3367


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
new file mode 100644
index 0000000..e3edb01
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
@@ -0,0 +1,579 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.task;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
+import org.apache.reef.io.network.group.api.task.NodeStruct;
+import org.apache.reef.io.network.group.api.task.OperatorTopologyStruct;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.operators.Sender;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+/**
+ *
+ */
+public class OperatorTopologyStructImpl implements OperatorTopologyStruct {
+
+  private static final int SMALL_MSG_LENGTH = 1 << 20;
+
+  private static final Logger LOG = 
Logger.getLogger(OperatorTopologyStructImpl.class.getName());
+
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final String selfId;
+  private final String driverId;
+  private final Sender sender;
+
+  private boolean changes = true;
+  private NodeStruct parent;
+  private final List<NodeStruct> children = new ArrayList<>();
+
+  private final BlockingQueue<NodeStruct> nodesWithData = new 
LinkedBlockingQueue<>();
+  private final Set<String> childrenToRcvFrom = new HashSet<>();
+
+  private final ConcurrentMap<String, Set<Integer>> deadMsgs = new 
ConcurrentHashMap<>();
+
+  private final int version;
+
+  public OperatorTopologyStructImpl(final Class<? extends Name<String>> 
groupName,
+                                    final Class<? extends Name<String>> 
operName, final String selfId,
+                                    final String driverId, final Sender 
sender, final int version) {
+    super();
+    this.groupName = groupName;
+    this.operName = operName;
+    this.selfId = selfId;
+    this.driverId = driverId;
+    this.sender = sender;
+    this.version = version;
+  }
+
+  public OperatorTopologyStructImpl(final OperatorTopologyStruct topology) {
+    super();
+    this.groupName = topology.getGroupName();
+    this.operName = topology.getOperName();
+    this.selfId = topology.getSelfId();
+    this.driverId = topology.getDriverId();
+    this.sender = topology.getSender();
+    this.changes = topology.hasChanges();
+    this.parent = topology.getParent();
+    this.children.addAll(topology.getChildren());
+    this.version = topology.getVersion();
+  }
+
+  @Override
+  public String toString() {
+    return "OperatorTopologyStruct - " + Utils.simpleName(groupName) + ":" + 
Utils.simpleName(operName) + "(" + selfId + "," + version + ")";
+  }
+
+  @Override
+  public NodeStruct getParent() {
+    return parent;
+  }
+
+  @Override
+  public Collection<? extends NodeStruct> getChildren() {
+    return children;
+  }
+
+  @Override
+  public Class<? extends Name<String>> getGroupName() {
+    return groupName;
+  }
+
+  @Override
+  public Class<? extends Name<String>> getOperName() {
+    return operName;
+  }
+
+  @Override
+  public String getSelfId() {
+    return selfId;
+  }
+
+  @Override
+  public String getDriverId() {
+    return driverId;
+  }
+
+  @Override
+  public Sender getSender() {
+    return sender;
+  }
+
+  @Override
+  public boolean hasChanges() {
+    LOG.entering("OperatorTopologyStructImpl", "hasChanges", 
getQualifiedName());
+    LOG.exiting("OperatorTopologyStructImpl", "hasChanges", 
Arrays.toString(new Object[]{this.changes, getQualifiedName()}));
+    return this.changes;
+  }
+
+  @Override
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public void addAsData(final GroupCommunicationMessage msg) {
+    LOG.entering("OperatorTopologyStructImpl", "addAsData", new 
Object[]{getQualifiedName(), msg});
+    final String srcId = msg.getSrcid();
+    final NodeStruct node = findNode(srcId);
+    if (node != null) {
+      try {
+        nodesWithData.put(node);
+        LOG.finest(getQualifiedName() + "Added node " + srcId + " to 
nodesWithData queue");
+      } catch (final InterruptedException e) {
+        throw new RuntimeException("InterruptedException while adding to 
childrenWithData queue", e);
+      }
+      node.addData(msg);
+    } else {
+      LOG.fine("Unable to find node " + srcId + " to send " + msg.getType() + 
" to");
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "addAsData", Arrays.toString(new 
Object[]{getQualifiedName(), msg}));
+  }
+
+  private NodeStruct findNode(final String srcId) {
+    LOG.entering("OperatorTopologyStructImpl", "findNode", new 
Object[]{getQualifiedName(), srcId});
+    final NodeStruct retVal;
+    if (parent != null && parent.getId().equals(srcId)) {
+      retVal = parent;
+    } else {
+      retVal = findChild(srcId);
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "findNode", Arrays.toString(new 
Object[]{retVal, getQualifiedName(), srcId}));
+    return retVal;
+  }
+
+  private void sendToNode(final byte[] data, final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, final NodeStruct 
node) {
+    LOG.entering("OperatorTopologyStructImpl", "sendToNode", new 
Object[]{getQualifiedName(), data, msgType, node});
+    final String nodeId = node.getId();
+    try {
+
+      if (data.length > SMALL_MSG_LENGTH) {
+        LOG.finest(getQualifiedName() + "Msg too big. Sending readiness to 
send " + msgType + " msg to " + nodeId);
+        sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, 
selfId, version, nodeId, node.getVersion(),
+            Utils.EmptyByteArr));
+        final byte[] tmpVal = receiveFromNode(node, true);
+        if (tmpVal != null) {
+          LOG.finest(getQualifiedName() + "Got readiness to accept " + msgType 
+ " msg from " + nodeId
+              + ". Will send actual msg now");
+        } else {
+          LOG.exiting("OperatorTopologyStructImpl", "sendToNode", 
Arrays.toString(new Object[]{getQualifiedName(),
+              data, msgType, node}));
+          return;
+        }
+      }
+
+      sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, selfId, 
version, nodeId, node.getVersion(), data));
+
+      if (data.length > SMALL_MSG_LENGTH) {
+        LOG.finest(getQualifiedName() + "Msg too big. Will wait for ACK before 
queing up one more msg");
+        final byte[] tmpVal = receiveFromNode(node, true);
+        if (tmpVal != null) {
+          LOG.finest(getQualifiedName() + "Got " + msgType + " msg received 
ACK from " + nodeId
+              + ". Will move to next msg if it exists");
+        } else {
+          LOG.exiting("OperatorTopologyStructImpl", "sendToNode", 
Arrays.toString(new Object[]{getQualifiedName(),
+              data, msgType, node}));
+          return;
+        }
+      }
+    } catch (final NetworkException e) {
+      throw new RuntimeException(
+          "NetworkException while sending " + msgType + " data from " + selfId 
+ " to " + nodeId,
+          e);
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "sendToNode", 
Arrays.toString(new Object[]{getQualifiedName(), data,
+        msgType, node}));
+  }
+
+  /**
+   * @param childNode
+   * @return
+   */
+  private byte[] receiveFromNode(final NodeStruct node, final boolean remove) {
+    LOG.entering("OperatorTopologyStructImpl", "receiveFromNode", new 
Object[]{getQualifiedName(), node, remove});
+    final byte[] retVal = node.getData();
+    if (remove) {
+      final boolean removed = nodesWithData.remove(node);
+      final String msg = getQualifiedName() + "Removed(" + removed + ") node " 
+ node.getId()
+          + " from nodesWithData queue";
+      if (removed) {
+        LOG.finest(msg);
+      } else {
+        LOG.fine(msg);
+      }
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "receiveFromNode", 
Arrays.toString(new Object[]{retVal, getQualifiedName(),
+        node, remove}));
+    return retVal;
+  }
+
+  @Override
+  public void sendToParent(final byte[] data, final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) {
+    LOG.entering("OperatorTopologyStructImpl", "sendToParent", new 
Object[]{getQualifiedName(), data, msgType});
+    if (parent != null) {
+      sendToNode(data, msgType, parent);
+    } else {
+      LOG.fine(getQualifiedName() + "Perhaps parent has died or has not been 
configured");
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "sendToParent", 
Arrays.toString(new Object[]{getQualifiedName(), data,
+        msgType}));
+  }
+
+  @Override
+  public void sendToChildren(final byte[] data, final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) {
+    LOG.entering("OperatorTopologyStructImpl", "sendToChildren", new 
Object[]{getQualifiedName(), data, msgType});
+    for (final NodeStruct child : children) {
+      sendToNode(data, msgType, child);
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "sendToChildren", 
Arrays.toString(new Object[]{getQualifiedName(),
+        data, msgType}));
+  }
+
+  @Override
+  public byte[] recvFromParent() {
+    LOG.entering("OperatorTopologyStructImpl", "recvFromParent", 
getQualifiedName());
+    LOG.finest(getQualifiedName() + "Waiting for " + parent.getId() + " to 
send data");
+    byte[] retVal = receiveFromNode(parent, true);
+    if (retVal != null && retVal.length == 0) {
+      LOG.finest(getQualifiedName() + "Got msg that parent " + parent.getId()
+          + " has large data and is ready to send data. Sending Ack to receive 
data");
+      sendToNode(Utils.EmptyByteArr, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast, parent);
+      retVal = receiveFromNode(parent, true);
+      if (retVal != null) {
+        LOG.finest(getQualifiedName() + "Received large msg from Parent " + 
parent.getId()
+            + ". Will return it after ACKing it");
+        sendToNode(Utils.EmptyByteArr, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast, parent);
+      }
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "recvFromParent",
+        Arrays.toString(new Object[]{retVal, getQualifiedName()}));
+    return retVal;
+  }
+
+  @Override
+  public <T> T recvFromChildren(final ReduceFunction<T> redFunc, final 
Codec<T> dataCodec) {
+    LOG.entering("OperatorTopologyStructImpl", "recvFromChildren", new 
Object[]{getQualifiedName(), redFunc,
+        dataCodec});
+    final List<T> retLst = new ArrayList<>(2);
+    for (final NodeStruct child : children) {
+      childrenToRcvFrom.add(child.getId());
+    }
+
+    while (!childrenToRcvFrom.isEmpty()) {
+      LOG.finest(getQualifiedName() + "Waiting for some child to send data");
+      NodeStruct child;
+      try {
+        child = nodesWithData.take();
+      } catch (final InterruptedException e) {
+        throw new RuntimeException("InterruptedException while waiting to take 
data from nodesWithData queue", e);
+      }
+      byte[] retVal = receiveFromNode(child, false);
+      if (retVal != null && retVal.length == 0) {
+        LOG.finest(getQualifiedName() + "Got msg that child " + child.getId()
+            + " has large data and is ready to send data. Sending Ack to 
receive data");
+        sendToNode(Utils.EmptyByteArr, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce, child);
+        retVal = receiveFromNode(child, true);
+        if (retVal != null) {
+          LOG.finest(getQualifiedName() + "Received large msg from child " + 
child.getId()
+              + ". Will reduce it after ACKing it");
+          sendToNode(Utils.EmptyByteArr, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce, child);
+        } else {
+          LOG.finest(getQualifiedName() + "Will not reduce it");
+        }
+      }
+      if (retVal != null) {
+        retLst.add(dataCodec.decode(retVal));
+        if (retLst.size() == 2) {
+          final T redVal = redFunc.apply(retLst);
+          retLst.clear();
+          retLst.add(redVal);
+        }
+      }
+      childrenToRcvFrom.remove(child.getId());
+    }
+    final T retVal = retLst.isEmpty() ? null : retLst.get(0);
+    LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren", 
Arrays.toString(new Object[]{retVal, getQualifiedName(),
+        redFunc, dataCodec}));
+    return retVal;
+  }
+
+  private boolean removedDeadMsg(final String msgSrcId, final int 
msgSrcVersion) {
+    LOG.entering("OperatorTopologyStructImpl", "removedDeadMsg", new 
Object[]{getQualifiedName(), msgSrcId,
+        msgSrcVersion});
+    boolean retVal = false;
+    final Set<Integer> msgVersions = deadMsgs.get(msgSrcId);
+    if (msgVersions != null) {
+      LOG.fine(getQualifiedName() + "Found dead msgs " + msgVersions + " 
waiting for add");
+      if (msgVersions.remove(msgSrcVersion)) {
+        LOG.fine(getQualifiedName() + "Found dead msg with same version as 
srcVer-" + msgSrcVersion);
+        retVal = true;
+      } else {
+        LOG.finest(getQualifiedName() + "No dead msg with same version as 
srcVer-" + msgSrcVersion);
+      }
+    } else {
+      LOG.finest(getQualifiedName() + "No dead msgs waiting for add.");
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "removedDeadMsg",
+        Arrays.toString(new Object[]{retVal, getQualifiedName(), msgSrcId, 
msgSrcVersion}));
+    return retVal;
+  }
+
+  private void addToDeadMsgs(final String srcId, final int version) {
+    LOG.entering("OperatorTopologyStructImpl", "addToDeadMsgs", new 
Object[]{getQualifiedName(), srcId, version});
+    deadMsgs.putIfAbsent(srcId, new HashSet<Integer>());
+    deadMsgs.get(srcId).add(version);
+    LOG.exiting("OperatorTopologyStructImpl", "addToDeadMsgs", 
Arrays.toString(new Object[]{getQualifiedName(),
+        srcId, version}));
+  }
+
+  private boolean addedToDeadMsgs(final NodeStruct node, final String 
msgSrcId, final int msgSrcVersion) {
+    LOG.entering("OperatorTopologyStructImpl", "addedToDeadMsgs", new 
Object[]{getQualifiedName(), node, msgSrcId,
+        msgSrcVersion});
+    if (node == null) {
+      LOG.warning(getQualifiedName() + "Got dead msg when no node existed. OOS 
Queing up for add to handle");
+      addToDeadMsgs(msgSrcId, msgSrcVersion);
+      LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", 
Arrays.toString(new Object[]{true, getQualifiedName(),
+          node, msgSrcId,
+          msgSrcVersion}));
+      return true;
+    }
+    final int nodeVersion = node.getVersion();
+    if (msgSrcVersion > nodeVersion) {
+      LOG.warning(getQualifiedName() + "Got an OOS dead msg. " + "Has HIGHER 
ver-" + msgSrcVersion + " than node ver-"
+          + nodeVersion + ". Queing up for add to handle");
+      addToDeadMsgs(msgSrcId, msgSrcVersion);
+      LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", 
Arrays.toString(new Object[]{true, getQualifiedName(),
+          node, msgSrcId,
+          msgSrcVersion}));
+      return true;
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", 
Arrays.toString(new Object[]{false, getQualifiedName(),
+        node, msgSrcId,
+        msgSrcVersion}));
+    return false;
+  }
+
+  /**
+   * Updates the topology structure with the received
+   * message. Does not make assumptions about msg order
+   * Tries to handle OOS msgs
+   * <p/>
+   * Expects only control messages
+   */
+  @Override
+  public void update(final GroupCommunicationMessage msg) {
+    if (msg.hasSrcVersion()) {
+      final String srcId = msg.getSrcid();
+      final int srcVersion = msg.getSrcVersion();
+      LOG.finest(getQualifiedName() + "Updating " + msg.getType() + " msg from 
" + srcId);
+      LOG.finest(getQualifiedName() + "Before update: parent=" + ((parent != 
null) ? parent.getId() : "NULL"));
+      LOG.finest(getQualifiedName() + "Before update: children=" + children);
+      switch (msg.getType()) {
+        case ParentAdd:
+          updateParentAdd(srcId, srcVersion);
+          break;
+        case ParentDead:
+          updateParentDead(srcId, srcVersion);
+          break;
+        case ChildAdd:
+          updateChildAdd(srcId, srcVersion);
+          break;
+        case ChildDead:
+          updateChildDead(srcId, srcVersion);
+          break;
+        default:
+          throw new RuntimeException("Received a non control message in 
update");
+      }
+      LOG.finest(getQualifiedName() + "After update: parent=" + ((parent != 
null) ? parent.getId() : "NULL"));
+      LOG.finest(getQualifiedName() + "After update: children=" + children);
+    } else {
+      throw new RuntimeException(getQualifiedName() + "can only deal with msgs 
that have src version set");
+    }
+  }
+
+  private void updateChildDead(final String srcId, final int srcVersion) {
+    LOG.entering("OperatorTopologyStructImpl", "updateChildDead",
+        new Object[]{getQualifiedName(), srcId, srcVersion});
+    final NodeStruct toBeRemovedchild = findChild(srcId);
+    if (!addedToDeadMsgs(toBeRemovedchild, srcId, srcVersion)) {
+      final int childVersion = toBeRemovedchild.getVersion();
+      if (srcVersion < childVersion) {
+        LOG.finest(getQualifiedName() + "Got an OOS child dead msg. " + "Has 
LOWER ver-" + srcVersion
+            + " than child ver-" + childVersion + ". Discarding");
+        LOG.exiting("OperatorTopologyStructImpl", "updateChildDead", 
Arrays.toString(new Object[]{getQualifiedName(),
+            srcId, srcVersion}));
+        return;
+      } else {
+        LOG.finest(getQualifiedName() + "Got a child dead msg. " + "Has SAME 
ver-" + srcVersion + " as child ver-"
+            + childVersion + "Removing child node");
+      }
+    } else {
+      LOG.fine(getQualifiedName() + "Added to dead msgs. Removing child node 
since ChildAdd might not turn up");
+    }
+    children.remove(toBeRemovedchild);
+    LOG.exiting("OperatorTopologyStructImpl", "updateChildDead", 
Arrays.toString(new Object[]{getQualifiedName(),
+        srcId, srcVersion}));
+  }
+
+  private void updateChildAdd(final String srcId, final int srcVersion) {
+    LOG.entering("OperatorTopologyStructImpl", "updateChildAdd", new 
Object[]{getQualifiedName(), srcId, srcVersion});
+    if (!removedDeadMsg(srcId, srcVersion)) {
+      final NodeStruct toBeAddedchild = findChild(srcId);
+      if (toBeAddedchild != null) {
+        LOG.warning(getQualifiedName() + "Child already exists");
+        final int childVersion = toBeAddedchild.getVersion();
+        if (srcVersion < childVersion) {
+          LOG.fine(getQualifiedName() + "Got an OOS child add msg. " + "Has 
LOWER ver-" + srcVersion
+              + " than child ver-" + childVersion + ". Discarding");
+          LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd",
+              Arrays.toString(new Object[]{getQualifiedName(), srcId, 
srcVersion}));
+          return;
+        }
+        if (srcVersion > childVersion) {
+          LOG.fine(getQualifiedName() + "Got an OOS child add msg. " + "Has 
HIGHER ver-" + srcVersion
+              + " than child ver-" + childVersion + ". Bumping up version 
number");
+          toBeAddedchild.setVersion(srcVersion);
+          LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd",
+              Arrays.toString(new Object[]{getQualifiedName(), srcId, 
srcVersion}));
+          return;
+        } else {
+          throw new RuntimeException(getQualifiedName() + "Got two child add 
msgs of same version-" + srcVersion);
+        }
+      } else {
+        LOG.finest(getQualifiedName() + "Creating new child node for " + 
srcId);
+        children.add(new ChildNodeStruct(srcId, srcVersion));
+      }
+    } else {
+      LOG.warning(getQualifiedName() + "Removed dead msg. Not adding child");
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd", 
Arrays.toString(new Object[]{getQualifiedName(),
+        srcId, srcVersion}));
+  }
+
+  private void updateParentDead(final String srcId, final int srcVersion) {
+    LOG.entering("OperatorTopologyStructImpl", "updateParentDead",
+        new Object[]{getQualifiedName(), srcId, srcVersion});
+    if (!addedToDeadMsgs(parent, srcId, srcVersion)) {
+      final int parentVersion = parent.getVersion();
+      if (srcVersion < parentVersion) {
+        LOG.fine(getQualifiedName() + "Got an OOS parent dead msg. " + "Has 
LOWER ver-" + srcVersion
+            + " than parent ver-" + parentVersion + ". Discarding");
+        LOG.exiting("OperatorTopologyStructImpl", "updateParentDead",
+            Arrays.toString(new Object[]{getQualifiedName(), srcId, 
srcVersion}));
+        return;
+      } else {
+        LOG.finest(getQualifiedName() + "Got a parent dead msg. " + "Has SAME 
ver-" + srcVersion + " as parent ver-"
+            + parentVersion + "Setting parent node to null");
+      }
+    } else {
+      LOG.warning(getQualifiedName() + "Added to dead msgs. Setting parent to 
null since ParentAdd might not turn up");
+    }
+    parent = null;
+    LOG.exiting("OperatorTopologyStructImpl", "updateParentDead", 
Arrays.toString(new Object[]{getQualifiedName(),
+        srcId, srcVersion}));
+  }
+
+  private void updateParentAdd(final String srcId, final int srcVersion) {
+    LOG.entering("OperatorTopologyStructImpl", "updateParentAdd",
+        new Object[]{getQualifiedName(), srcId, srcVersion});
+    if (!removedDeadMsg(srcId, srcVersion)) {
+      if (parent != null) {
+        LOG.fine(getQualifiedName() + "Parent already exists");
+        final int parentVersion = parent.getVersion();
+        if (srcVersion < parentVersion) {
+          LOG.fine(getQualifiedName() + "Got an OOS parent add msg. " + "Has 
LOWER ver-" + srcVersion
+              + " than parent ver-" + parentVersion + ". Discarding");
+          LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd",
+              Arrays.toString(new Object[]{getQualifiedName(), srcId, 
srcVersion}));
+          return;
+        }
+        if (srcVersion > parentVersion) {
+          LOG.fine(getQualifiedName() + "Got an OOS parent add msg. " + "Has 
HIGHER ver-" + srcVersion
+              + " than parent ver-" + parentVersion + ". Bumping up version 
number");
+          parent.setVersion(srcVersion);
+          LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd",
+              Arrays.toString(new Object[]{getQualifiedName(), srcId, 
srcVersion}));
+          return;
+        } else {
+          throw new RuntimeException(getQualifiedName() + "Got two parent add 
msgs of same version-" + srcVersion);
+        }
+      } else {
+        LOG.finest(getQualifiedName() + "Creating new parent node for " + 
srcId);
+        parent = new ParentNodeStruct(srcId, srcVersion);
+      }
+    } else {
+      LOG.fine(getQualifiedName() + "Removed dead msg. Not adding parent");
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd", 
Arrays.toString(new Object[]{getQualifiedName(),
+        srcId, srcVersion}));
+  }
+
+  /**
+   * @param srcId
+   * @return
+   */
+  private NodeStruct findChild(final String srcId) {
+    LOG.entering("OperatorTopologyStructImpl", "findChild", new 
Object[]{getQualifiedName(), srcId});
+    NodeStruct retVal = null;
+    for (final NodeStruct node : children) {
+      if (node.getId().equals(srcId)) {
+        retVal = node;
+        break;
+      }
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "findChild", Arrays.toString(new 
Object[]{retVal, getQualifiedName(),
+        srcId}));
+    return retVal;
+  }
+
+  @Override
+  public void update(final Set<GroupCommunicationMessage> deletionDeltas) {
+    LOG.entering("OperatorTopologyStructImpl", "update", new 
Object[]{"Updating topology with deleting msgs",
+        getQualifiedName(), deletionDeltas});
+    for (final GroupCommunicationMessage delDelta : deletionDeltas) {
+      update(delDelta);
+    }
+    LOG.exiting("OperatorTopologyStructImpl", "update", Arrays.toString(new 
Object[]{getQualifiedName(),
+        deletionDeltas}));
+  }
+
+  @Override
+  public void setChanges(final boolean changes) {
+    LOG.entering("OperatorTopologyStructImpl", "setChanges", new 
Object[]{getQualifiedName(), changes});
+    this.changes = changes;
+    LOG.exiting("OperatorTopologyStructImpl", "setChanges",
+        Arrays.toString(new Object[]{getQualifiedName(), changes}));
+  }
+
+  private String getQualifiedName() {
+    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + 
":" + selfId + ":ver(" + version + ") - ";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ParentNodeStruct.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ParentNodeStruct.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ParentNodeStruct.java
new file mode 100644
index 0000000..999f49d
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ParentNodeStruct.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.task;
+
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+
+import java.util.logging.Logger;
+
+/**
+ *
+ */
+public class ParentNodeStruct extends NodeStructImpl {
+
+  private static final Logger LOG = 
Logger.getLogger(ParentNodeStruct.class.getName());
+
+  public ParentNodeStruct(final String id, final int version) {
+    super(id, version);
+  }
+
+  @Override
+  public boolean checkDead(final GroupCommunicationMessage gcm) {
+    LOG.entering("ParentNodeStruct", "checkDead", gcm);
+    final boolean retVal = gcm.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead ? true : false;
+    LOG.exiting("ParentNodeStruct", "checkDead", gcm);
+    return retVal;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/BroadcastingEventHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/BroadcastingEventHandler.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/BroadcastingEventHandler.java
new file mode 100644
index 0000000..cdc46fc
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/BroadcastingEventHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class BroadcastingEventHandler<T> implements EventHandler<T> {
+
+  List<EventHandler<T>> handlers = new ArrayList<>();
+
+  public void addHandler(final EventHandler<T> handler) {
+    handlers.add(handler);
+  }
+
+  @Override
+  public void onNext(final T msg) {
+    for (final EventHandler<T> handler : handlers) {
+      handler.onNext(msg);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java
new file mode 100644
index 0000000..7dc651b
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Utility map class that wraps a CountingMap
+ * in a ConcurrentMap
+ * Equivalent to Map<K,Map<V,Integer>>
+ */
+public class ConcurrentCountingMap<K, V> {
+
+  private final ConcurrentMap<K, CountingMap<V>> map = new 
ConcurrentHashMap<>();
+
+  public boolean remove (final K key, final V value) {
+    if (!map.containsKey(key)) {
+      return false;
+    }
+    final boolean retVal = map.get(key).remove(value);
+    if (map.get(key).isEmpty()) {
+      map.remove(key);
+    }
+    return retVal;
+  }
+
+  public void add (final K key, final V value) {
+    map.putIfAbsent(key, new CountingMap<V>());
+    map.get(key).add(value);
+  }
+
+  public CountingMap<V> get (final K key) {
+    return map.get(key);
+  }
+
+  public boolean isEmpty () {
+    return map.isEmpty();
+  }
+
+  public boolean containsKey (final K key) {
+    return map.containsKey(key);
+  }
+
+  public boolean contains (final K key, final V value) {
+    if (!map.containsKey(key)) {
+      return false;
+    }
+    return map.get(key).containsKey(value);
+  }
+
+  public boolean notContains (final V value) {
+    for (final CountingMap<V> innerMap : map.values()) {
+      if (innerMap.containsKey(value)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public String toString () {
+    return map.toString();
+  }
+
+  public void clear () {
+    for (final CountingMap<V> value : map.values()) {
+      value.clear();
+    }
+    map.clear();
+  }
+
+  public static void main (final String[] args) {
+    final Logger LOG = Logger.getLogger(ConcurrentCountingMap.class.getName());
+    final 
ConcurrentCountingMap<ReefNetworkGroupCommProtos.GroupCommMessage.Type, String> 
strMap = new ConcurrentCountingMap<>();
+    LOG.log(Level.INFO, "OUT: {0}", strMap.isEmpty());
+    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST0");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST1");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, 
"ST0");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, 
"ST1");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST2");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST3");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST0");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST1");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST2");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST3");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST0");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    LOG.log(Level.INFO, "OUT: {0}", 
strMap.get(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd));
+    LOG.log(Level.INFO, "OUT: {0}", 
strMap.get(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead));
+    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST1");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST1");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    LOG.log(Level.INFO, "OUT: {0}", 
strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd));
+    LOG.log(Level.INFO, "OUT: {0}", 
strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead));
+    LOG.log(Level.INFO, "OUT: {0}", 
strMap.contains(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST0"));
+    LOG.log(Level.INFO, "OUT: {0}", 
strMap.contains(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST2"));
+    strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST0");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    LOG.log(Level.INFO, "OUT: {0}", 
strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd));
+    LOG.log(Level.INFO, "OUT: {0}", strMap.isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingMap.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingMap.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingMap.java
new file mode 100644
index 0000000..58c3f5c
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingMap.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Utility class to provide a map that allows to
+ * add multiple keys and automatically
+ * incrementing the count on each add
+ * decrementing the count on each remove
+ * and removing key on count==0
+ */
+public class CountingMap<L> {
+
+  private final Map<L, Integer> map = new HashMap<>();
+
+  public boolean containsKey (final L value) {
+    return map.containsKey(value);
+  }
+
+  public int get (final L value) {
+    if (!containsKey(value)) {
+      return 0;
+    }
+    return map.get(value);
+  }
+
+  public boolean isEmpty () {
+    return map.isEmpty();
+  }
+
+  public void clear () {
+    map.clear();
+  }
+
+  public void add (final L value) {
+    int cnt = (map.containsKey(value)) ? map.get(value) : 0;
+    map.put(value, ++cnt);
+  }
+
+  public boolean remove (final L value) {
+    if (!map.containsKey(value)) {
+      return false;
+    }
+    int cnt = map.get(value);
+    --cnt;
+    if (cnt == 0) {
+      map.remove(value);
+    } else {
+      map.put(value, cnt);
+    }
+    return true;
+  }
+
+  @Override
+  public String toString () {
+    return map.toString();
+  }
+
+  public static void main (final String[] args) {
+    final Logger LOG = Logger.getLogger(CountingMap.class.getName());
+    final CountingMap<String> strMap = new CountingMap<>();
+    strMap.add("Hello");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.add("World");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.add("Hello");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.add("Hello");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.add("World!");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.remove("Hello");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+    strMap.remove("World");
+    LOG.log(Level.INFO, "OUT: {0}", strMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingSemaphore.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingSemaphore.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingSemaphore.java
new file mode 100644
index 0000000..3014667
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingSemaphore.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+public class CountingSemaphore {
+
+  private static final Logger LOG = 
Logger.getLogger(CountingSemaphore.class.getName());
+
+  private final AtomicInteger counter;
+
+  private final String name;
+
+  private final Object lock;
+
+  private final int initCount;
+
+  public CountingSemaphore (final int initCount, final String name, final 
Object lock) {
+    super();
+    this.initCount = initCount;
+    this.name = name;
+    this.lock = lock;
+    this.counter = new AtomicInteger(initCount);
+    LOG.finest("Counter initialized to " + initCount);
+  }
+
+  public int getInitialCount() {
+    return initCount;
+  }
+
+  public int increment () {
+    synchronized (lock) {
+      final int retVal = counter.incrementAndGet();
+      LOG.finest(name + "Incremented counter to " + retVal);
+      logStatus();
+      return retVal;
+    }
+  }
+
+  private void logStatus () {
+    final int yetToRun = counter.get();
+    final int curRunning = initCount - yetToRun;
+    LOG.fine(name + curRunning + " workers are running & " + yetToRun + " 
workers are yet to run");
+  }
+
+  public int decrement () {
+    synchronized (lock) {
+      final int retVal = counter.decrementAndGet();
+      LOG.finest(name + "Decremented counter to " + retVal);
+      if (retVal < 0) {
+        LOG.warning("Counter negative. More workers exist than you expected");
+      }
+      if (retVal <= 0) {
+        LOG.finest(name + "All workers are done with their task. Notifying 
waiting threads");
+        lock.notifyAll();
+      } else {
+        LOG.finest(name + "Some workers are not done yet");
+      }
+      logStatus();
+      return retVal;
+    }
+  }
+
+  public int get () {
+    synchronized (lock) {
+      return counter.get();
+    }
+  }
+
+  public void await () {
+    synchronized (lock) {
+      LOG.finest(name + "Waiting for workers to be done");
+      while (counter.get() > 0) {
+        try {
+          lock.wait();
+          LOG.finest(name + "Notified with counter=" + counter.get());
+        } catch (final InterruptedException e) {
+          throw new RuntimeException("InterruptedException while waiting for 
counting semaphore counter", e);
+        }
+      }
+      LOG.finest(name + "Returning from wait");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ResettingCountDownLatch.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ResettingCountDownLatch.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ResettingCountDownLatch.java
new file mode 100644
index 0000000..30631c8
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ResettingCountDownLatch.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import java.util.concurrent.CountDownLatch;
+
+public class ResettingCountDownLatch {
+  private CountDownLatch latch;
+
+  public ResettingCountDownLatch (final int initialCount) {
+    latch = new CountDownLatch(initialCount);
+  }
+
+  /**
+   *
+   */
+  public void await () {
+    try {
+      latch.await();
+    } catch (final InterruptedException e) {
+      throw new RuntimeException("InterruptedException while waiting for 
latch", e);
+    }
+  }
+
+  public void awaitAndReset (final int resetCount) {
+    try {
+      latch.await();
+      latch = new CountDownLatch(resetCount);
+    } catch (final InterruptedException e) {
+      throw new RuntimeException("InterruptedException while waiting for 
latch", e);
+    }
+  }
+
+  /**
+   *
+   */
+  public void countDown () {
+    latch.countDown();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/SetMap.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/SetMap.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/SetMap.java
new file mode 100644
index 0000000..6a4b424
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/SetMap.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import org.apache.reef.io.network.group.impl.driver.MsgKey;
+
+import java.util.*;
+
+/**
+ * Map from K to Set<V>
+ */
+public class SetMap<K, V> {
+  private final Map<K, Set<V>> map = new HashMap<>();
+
+  public boolean containsKey(final K key) {
+    return map.containsKey(key);
+  }
+
+  public boolean contains(final K key, final V value) {
+    if (!containsKey(key)) {
+      return false;
+    }
+    return map.get(key).contains(value);
+  }
+
+  public Set<V> get(final K key) {
+    if (map.containsKey(key)) {
+      return map.get(key);
+    } else {
+      return Collections.emptySet();
+    }
+  }
+
+  public void add(final K key, final V value) {
+    final Set<V> values;
+    if (!map.containsKey(key)) {
+      values = new HashSet<>();
+      map.put(key, values);
+    } else {
+      values = map.get(key);
+    }
+    values.add(value);
+  }
+
+  public boolean remove(final K key, final V value) {
+    if (!map.containsKey(key)) {
+      return false;
+    }
+    final Set<V> set = map.get(key);
+    final boolean retVal = set.remove(value);
+    if (set.isEmpty()) {
+      map.remove(key);
+    }
+    return retVal;
+  }
+
+  /**
+   * @param key
+   * @return
+   */
+  public int count(final K key) {
+    if (!containsKey(key)) {
+      return 0;
+    } else {
+      return map.get(key).size();
+    }
+  }
+
+  /**
+   * @param key
+   */
+  public Set<V> remove(final MsgKey key) {
+    return map.remove(key);
+  }
+
+  public Set<K> keySet() {
+    return map.keySet();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java
new file mode 100644
index 0000000..27811cb
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.tang.annotations.Name;
+
+import java.util.Iterator;
+
+/**
+ *
+ */
+public class Utils {
+
+  public static final byte[] EmptyByteArr = new byte[0];
+
+  public static GroupCommunicationMessage bldVersionedGCM(final Class<? 
extends Name<String>> groupName,
+                                                          final Class<? 
extends Name<String>> operName,
+                                                          final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, final String from, 
final int srcVersion,
+                                                          final String to, 
final int dstVersion, final byte[]... data) {
+
+    return new GroupCommunicationMessage(groupName.getName(), 
operName.getName(), msgType, from, srcVersion, to,
+        dstVersion, data);
+  }
+
+  public static Class<? extends Name<String>> getClass(final String className) 
{
+    try {
+      return (Class<? extends Name<String>>) Class.forName(className);
+    } catch (final ClassNotFoundException e) {
+      throw new RuntimeException("Unable to find class " + className, e);
+    }
+  }
+
+  public static String simpleName(final Class<?> className) {
+    if (className != null) {
+      return className.getSimpleName();
+    } else {
+      return "NULL";
+    }
+  }
+
+  public static byte[] getData(final GroupCommunicationMessage gcm) {
+    return (gcm.getMsgsCount() == 1) ? gcm.getData()[0] : null;
+  }
+
+  /**
+   * @param msg
+   * @return
+   */
+  public static GroupCommunicationMessage getGCM(final 
Message<GroupCommunicationMessage> msg) {
+    final Iterator<GroupCommunicationMessage> gcmIterator = 
msg.getData().iterator();
+    if (gcmIterator.hasNext()) {
+      final GroupCommunicationMessage gcm = gcmIterator.next();
+      if (gcmIterator.hasNext()) {
+        throw new RuntimeException("Expecting exactly one GCM object inside 
Message but found more");
+      }
+      return gcm;
+    } else {
+      throw new RuntimeException("Expecting exactly one GCM object inside 
Message but found none");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/package-info.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/package-info.java
new file mode 100644
index 0000000..4946182
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/package-info.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Elastic Group Communications for REEF.
+ *
+ * Provides MPI style Group Communication operators for collective 
communication
+ * between tasks. These should be primarily used for any form of
+ * task to task messaging along with the point to point communication
+ * provided by {@link org.apache.reef.io.network.impl.NetworkService}
+ *
+ * The interfaces for the operators are in 
org.apache.reef.io.network.group.api.operators
+ * The fluent way to describe these operators is available 
org.apache.reef.io.network.group.config
+ * The implementation of these operators are available in 
org.apache.reef.io.network.group.impl
+ * Currently only a basic implementation is available
+ */
+package org.apache.reef.io.network.group;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
index 5a4c765..0737286 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java
@@ -20,29 +20,11 @@
 package org.apache.reef.io.network.naming;
 
 import org.apache.reef.io.naming.NameAssignment;
-import org.apache.reef.io.network.naming.serialization.*;
-import org.apache.reef.tang.annotations.DefaultImplementation;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.Identifier;
-import org.apache.reef.wake.IdentifierFactory;
 import org.apache.reef.wake.Stage;
-import org.apache.reef.wake.impl.MultiEventHandler;
-import org.apache.reef.wake.impl.SyncStage;
-import org.apache.reef.wake.remote.Codec;
-import org.apache.reef.wake.remote.NetUtils;
-import org.apache.reef.wake.remote.impl.TransportEvent;
-import org.apache.reef.wake.remote.transport.Transport;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
-import org.apache.reef.webserver.AvroReefServiceInfo;
-import org.apache.reef.webserver.ReefEventStateManager;
 
-import javax.inject.Inject;
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.List;
 
 /**
  * Naming server interface

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
index 4cb0cc6..ff241a0 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
@@ -24,7 +24,6 @@ import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.Identifier;
 import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.Stage;
 import org.apache.reef.wake.impl.MultiEventHandler;
 import org.apache.reef.wake.impl.SyncStage;
 import org.apache.reef.wake.remote.Codec;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Utils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Utils.java 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Utils.java
new file mode 100644
index 0000000..3e58f36
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Utils.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.wake.ComparableIdentifier;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+
+import java.net.Inet4Address;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class Utils {
+
+  private static final String DELIMITER = "-";
+
+  /**
+   * TODO: Merge with parseListCmp() into one generic implementation.
+   */
+  public static List<Identifier> parseList(
+      final String ids, final IdentifierFactory factory) {
+    final List<Identifier> result = new ArrayList<>();
+    for (final String token : ids.split(DELIMITER)) {
+      result.add(factory.getNewInstance(token.trim()));
+    }
+    return result;
+  }
+
+  /**
+   * TODO: Merge with parseList() into one generic implementation.
+   */
+  public static List<ComparableIdentifier> parseListCmp(
+      final String ids, final IdentifierFactory factory) {
+    final List<ComparableIdentifier> result = new ArrayList<>();
+    for (final String token : ids.split(DELIMITER)) {
+      result.add((ComparableIdentifier) factory.getNewInstance(token.trim()));
+    }
+    return result;
+  }
+
+  public static String listToString(final List<ComparableIdentifier> ids) {
+    return StringUtils.join(ids, DELIMITER);
+  }
+
+  public static List<Integer> createUniformCounts(final int elemSize, final 
int childSize) {
+    final int remainder = elemSize % childSize;
+    final int quotient = elemSize / childSize;
+    final ArrayList<Integer> result = new ArrayList<>(childSize);
+    result.addAll(Collections.nCopies(remainder, quotient + 1));
+    result.addAll(Collections.nCopies(childSize - remainder, quotient));
+    return Collections.unmodifiableList(result);
+  }
+
+  private static class AddressComparator implements Comparator<Inet4Address> {
+    @Override
+    public int compare(final Inet4Address aa, final Inet4Address ba) {
+      final byte[] a = aa.getAddress();
+      final byte[] b = ba.getAddress();
+      // local subnet comes after all else.
+      if (a[0] == 127 && b[0] != 127) {
+        return 1;
+      }
+      if (a[0] != 127 && b[0] == 127) {
+        return -1;
+      }
+      for (int i = 0; i < 4; i++) {
+        if (a[i] < b[i]) {
+          return -1;
+        }
+        if (a[i] > b[i]) {
+          return 1;
+        }
+      }
+      return 0;
+    }
+  }
+
+  public static ReefNetworkGroupCommProtos.GroupCommMessage bldGCM(
+      final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType,
+      final Identifier from, final Identifier to, final byte[]... elements) {
+
+    final ReefNetworkGroupCommProtos.GroupCommMessage.Builder GCMBuilder =
+        ReefNetworkGroupCommProtos.GroupCommMessage.newBuilder()
+            .setType(msgType)
+            .setSrcid(from.toString())
+            .setDestid(to.toString());
+
+    final ReefNetworkGroupCommProtos.GroupMessageBody.Builder bodyBuilder =
+        ReefNetworkGroupCommProtos.GroupMessageBody.newBuilder();
+
+    for (final byte[] element : elements) {
+      bodyBuilder.setData(ByteString.copyFrom(element));
+      GCMBuilder.addMsgs(bodyBuilder.build());
+    }
+
+    return GCMBuilder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java
index 35a20e1..ffceb5d 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java
@@ -33,12 +33,11 @@ import java.util.concurrent.ConcurrentSkipListMap;
  * default ExternalMap provided by StorageManagerRam.
  */
 public class RamMap<T> implements ExternalMap<T> {
-  private final ConcurrentSkipListMap<CharSequence, T> map
-      = new ConcurrentSkipListMap<CharSequence, T>();
+
+  private final ConcurrentSkipListMap<CharSequence, T> map = new 
ConcurrentSkipListMap<CharSequence, T>();
 
   @Inject
   public RamMap(RamStorageService ramStore) {
-    //this.localStore = localStore;
   }
 
   @Override
@@ -70,5 +69,4 @@ public class RamMap<T> implements ExternalMap<T> {
   public Iterable<Entry<CharSequence, T>> getAll(Set<? extends CharSequence> 
keys) {
     return new GetAllIterable<>(keys, this);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/proto/group_comm_protocol.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/proto/group_comm_protocol.proto 
b/lang/java/reef-io/src/main/proto/group_comm_protocol.proto
new file mode 100644
index 0000000..7e2f60f
--- /dev/null
+++ b/lang/java/reef-io/src/main/proto/group_comm_protocol.proto
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+option java_package = "org.apache.reef.io.network.proto";
+option java_outer_classname = "ReefNetworkGroupCommProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message GroupCommMessage {
+  enum Type { 
+       Scatter=1; 
+       Gather=2; 
+       Broadcast=3; 
+       Reduce=4; 
+       AllGather=5; 
+       AllReduce=6; 
+       ReduceScatter=7; 
+       SourceDead=8; 
+       SourceAdd=9; 
+       ParentAdd=10; 
+       ChildAdd=11;
+       ParentDead=12; 
+       ChildDead=13;
+       ParentAdded=14; 
+       ChildAdded=15;
+       ParentRemoved=16; 
+       ChildRemoved=17;
+       TopologySetup=18;
+       UpdateTopology=19;
+       TopologyUpdated=20;
+       TopologyChanges=21;
+  }
+
+  // identifies which field is filled in
+  required Type type = 1;
+  
+  required string srcid = 2;
+  required string destid = 3;
+  optional string groupname = 4;
+  optional string operatorname = 5;
+  optional int32 version = 6;
+  optional int32 srcVersion = 7;
+  repeated GroupMessageBody msgs = 8;
+}
+
+message GroupMessageBody {
+  required bytes data = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/GroupCommunicationMessageCodecTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/GroupCommunicationMessageCodecTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/GroupCommunicationMessageCodecTest.java
new file mode 100644
index 0000000..a83daa6
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/GroupCommunicationMessageCodecTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group;
+
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessageCodec;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Random;
+
+/**
+ *
+ */
+public class GroupCommunicationMessageCodecTest {
+
+  @NamedParameter
+  class GroupName implements Name<String> {
+  }
+
+  @NamedParameter
+  class OperName implements Name<String> {
+  }
+
+  @Test(timeout = 100)
+  public final void testInstantiation() throws InjectionException {
+    final GroupCommunicationMessageCodec codec = 
Tang.Factory.getTang().newInjector().getInstance(GroupCommunicationMessageCodec.class);
+    
Assert.assertNotNull("tang.getInstance(GroupCommunicationMessageCodec.class): 
", codec);
+  }
+
+  @Test(timeout = 100)
+  public final void testEncodeDecode() {
+    final Random r = new Random();
+    final byte[] data = new byte[100];
+    r.nextBytes(data);
+    final GroupCommunicationMessage expMsg = 
Utils.bldVersionedGCM(GroupName.class, OperName.class, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "From", 0, "To", 1, 
data);
+    final GroupCommunicationMessageCodec codec = new 
GroupCommunicationMessageCodec();
+    final GroupCommunicationMessage actMsg1 = 
codec.decode(codec.encode(expMsg));
+    Assert.assertEquals("decode(encode(msg)): ", expMsg, actMsg1);
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final DataOutputStream daos = new DataOutputStream(baos);
+    codec.encodeToStream(expMsg, daos);
+    final GroupCommunicationMessage actMsg2 = codec.decodeFromStream(new 
DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
+    Assert.assertEquals("decodeFromStream(encodeToStream(msg)): ", expMsg, 
actMsg2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TestUtils.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TestUtils.java
new file mode 100644
index 0000000..a41cf04
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TestUtils.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.wake.Identifier;
+
+public class TestUtils {
+  public static ReefNetworkGroupCommProtos.GroupCommMessage bldGCM(final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, final Identifier 
from, final Identifier to, final byte[]... elements) {
+    final ReefNetworkGroupCommProtos.GroupCommMessage.Builder GCMBuilder = 
ReefNetworkGroupCommProtos.GroupCommMessage.newBuilder();
+    GCMBuilder.setType(msgType);
+    GCMBuilder.setSrcid(from.toString());
+    GCMBuilder.setDestid(to.toString());
+    final ReefNetworkGroupCommProtos.GroupMessageBody.Builder bodyBuilder = 
ReefNetworkGroupCommProtos.GroupMessageBody.newBuilder();
+    for (final byte[] element : elements) {
+      bodyBuilder.setData(ByteString.copyFrom(element));
+      GCMBuilder.addMsgs(bodyBuilder.build());
+    }
+    final ReefNetworkGroupCommProtos.GroupCommMessage msg = GCMBuilder.build();
+    return msg;
+  }
+
+  /**
+   * @param type
+   * @return
+   */
+  public static boolean controlMessage(final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type type) {
+
+    switch (type) {
+      case AllGather:
+      case AllReduce:
+      case Broadcast:
+      case Gather:
+      case Reduce:
+      case ReduceScatter:
+      case Scatter:
+        return false;
+
+      default:
+        return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
index 96cf7d4..17765f1 100644
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
@@ -28,8 +28,6 @@ import org.apache.reef.io.network.naming.NameServerImpl;
 import org.apache.reef.io.network.util.StringIdentifierFactory;
 import org.apache.reef.services.network.util.Monitor;
 import org.apache.reef.services.network.util.StringCodec;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.Identifier;
 import org.apache.reef.wake.IdentifierFactory;
@@ -37,7 +35,7 @@ import org.apache.reef.wake.remote.NetUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
-import org.junit.Assert;
+
 import java.net.InetSocketAddress;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -55,8 +53,6 @@ public class NetworkServiceTest {
 
   /**
    * NetworkService messaging test
-   *
-   * @throws Exception
    */
   @Test
   public void testMessagingNetworkService() throws Exception {
@@ -113,8 +109,6 @@ public class NetworkServiceTest {
 
   /**
    * NetworkService messaging rate benchmark
-   *
-   * @throws Exception
    */
   @Test
   public void testMessagingNetworkServiceRate() throws Exception {
@@ -187,8 +181,6 @@ public class NetworkServiceTest {
 
   /**
    * NetworkService messaging rate benchmark
-   *
-   * @throws Exception
    */
   @Test
   public void testMessagingNetworkServiceRateDisjoint() throws Exception {
@@ -372,8 +364,6 @@ public class NetworkServiceTest {
 
   /**
    * NetworkService messaging rate benchmark
-   *
-   * @throws Exception
    */
   @Test
   public void testMessagingNetworkServiceBatchingRate() throws Exception {
@@ -452,8 +442,6 @@ public class NetworkServiceTest {
 
   /**
    * Test message handler
-   *
-   * @param <T> type
    */
   class MessageHandler<T> implements EventHandler<Message<T>> {
 
@@ -470,13 +458,17 @@ public class NetworkServiceTest {
 
     @Override
     public void onNext(Message<T> value) {
+
       count.incrementAndGet();
 
-      //System.out.print(name + " received " + value.getData() + " from " + 
value.getSrcId() + " to " + value.getDestId());
-      for (T obj : value.getData()) {
-        // System.out.print(" data: " + obj);
+      LOG.log(Level.FINEST,
+          "OUT: {0} received {1} from {2} to {3}",
+          new Object[] { name, value.getData(), value.getSrcId(), 
value.getDestId() });
+
+      for (final T obj : value.getData()) {
+        LOG.log(Level.FINEST, "OUT: data: {0}", obj);
       }
-      //LOG.log(Level.FINEST, );
+
       if (count.get() == expected) {
         monitor.mnotify();
       }

Reply via email to