http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java
new file mode 100644
index 0000000..3a6791c
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceReceiver.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Reduce;
+import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import 
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.config.parameters.*;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class ReduceReceiver<T> implements Reduce.Receiver<T>, 
EventHandler<GroupCommunicationMessage> {
+
+  private static final Logger LOG = 
Logger.getLogger(ReduceReceiver.class.getName());
+
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final CommGroupNetworkHandler commGroupNetworkHandler;
+  private final Codec<T> dataCodec;
+  private final NetworkService<GroupCommunicationMessage> netService;
+  private final Sender sender;
+  private final ReduceFunction<T> reduceFunction;
+
+  private final OperatorTopology topology;
+
+  private final CommunicationGroupServiceClient commGroupClient;
+
+  private final AtomicBoolean init = new AtomicBoolean(false);
+
+  private final int version;
+
+  @Inject
+  public ReduceReceiver(@Parameter(CommunicationGroupName.class) final String 
groupName,
+                        @Parameter(OperatorName.class) final String operName,
+                        @Parameter(TaskConfigurationOptions.Identifier.class) 
final String selfId,
+                        @Parameter(DataCodec.class) final Codec<T> dataCodec,
+                        @Parameter(ReduceFunctionParam.class) final 
ReduceFunction<T> reduceFunction,
+                        @Parameter(DriverIdentifier.class) final String 
driverId,
+                        @Parameter(TaskVersion.class) final int version,
+                        final CommGroupNetworkHandler commGroupNetworkHandler,
+                        final NetworkService<GroupCommunicationMessage> 
netService,
+                        final CommunicationGroupServiceClient commGroupClient) 
{
+    super();
+    this.version = version;
+    LOG.finest(operName + " has CommGroupHandler-" + 
commGroupNetworkHandler.toString());
+    this.groupName = Utils.getClass(groupName);
+    this.operName = Utils.getClass(operName);
+    this.dataCodec = dataCodec;
+    this.reduceFunction = reduceFunction;
+    this.commGroupNetworkHandler = commGroupNetworkHandler;
+    this.netService = netService;
+    this.sender = new Sender(this.netService);
+    this.topology = new OperatorTopologyImpl(this.groupName, this.operName, 
selfId, driverId, sender, version);
+    this.commGroupNetworkHandler.register(this.operName, this);
+    this.commGroupClient = commGroupClient;
+  }
+
+  @Override
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public void initialize() throws ParentDeadException {
+    topology.initialize();
+  }
+
+  @Override
+  public Class<? extends Name<String>> getOperName() {
+    return operName;
+  }
+
+  @Override
+  public Class<? extends Name<String>> getGroupName() {
+    return groupName;
+  }
+
+  @Override
+  public String toString() {
+    return "ReduceReceiver:" + Utils.simpleName(groupName) + ":" + 
Utils.simpleName(operName) + ":" + version;
+  }
+
+  @Override
+  public void onNext(final GroupCommunicationMessage msg) {
+    topology.handle(msg);
+  }
+
+  @Override
+  public T reduce() throws InterruptedException, NetworkException {
+    LOG.entering("ReduceReceiver", "reduce", this);
+    LOG.fine("I am " + this);
+
+    if (init.compareAndSet(false, true)) {
+      commGroupClient.initialize();
+    }
+    // I am root
+    LOG.fine(this + " Waiting to receive reduced value");
+    // Wait for children to send
+    final T redVal;
+    try {
+      redVal = topology.recvFromChildren(reduceFunction, dataCodec);
+    } catch (final ParentDeadException e) {
+      throw new RuntimeException("ParentDeadException", e);
+    }
+    LOG.fine(this + " Received Reduced value: " + (redVal != null ? redVal : 
"NULL"));
+    LOG.exiting("ReduceReceiver", "reduce", Arrays.toString(new 
Object[]{redVal}));
+    return redVal;
+  }
+
+  @Override
+  public T reduce(final List<? extends Identifier> order) throws 
InterruptedException, NetworkException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ReduceFunction<T> getReduceFunction() {
+    return reduceFunction;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java
new file mode 100644
index 0000000..505f072
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/ReduceSender.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Reduce;
+import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import 
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.config.parameters.*;
+import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ReduceSender<T> implements Reduce.Sender<T>, 
EventHandler<GroupCommunicationMessage> {
+
+  private static final Logger LOG = 
Logger.getLogger(ReduceSender.class.getName());
+
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final CommGroupNetworkHandler commGroupNetworkHandler;
+  private final Codec<T> dataCodec;
+  private final NetworkService<GroupCommunicationMessage> netService;
+  private final Sender sender;
+  private final ReduceFunction<T> reduceFunction;
+
+  private final OperatorTopology topology;
+
+  private final CommunicationGroupServiceClient commGroupClient;
+
+  private final AtomicBoolean init = new AtomicBoolean(false);
+
+  private final int version;
+
+  @Inject
+  public ReduceSender(
+      final @Parameter(CommunicationGroupName.class) String groupName,
+      final @Parameter(OperatorName.class) String operName,
+      final @Parameter(TaskConfigurationOptions.Identifier.class) String 
selfId,
+      final @Parameter(DataCodec.class) Codec<T> dataCodec,
+      final @Parameter(ReduceFunctionParam.class) ReduceFunction<T> 
reduceFunction,
+      final @Parameter(DriverIdentifier.class) String driverId,
+      final @Parameter(TaskVersion.class) int version,
+      final CommGroupNetworkHandler commGroupNetworkHandler,
+      final NetworkService<GroupCommunicationMessage> netService,
+      final CommunicationGroupServiceClient commGroupClient) {
+
+    super();
+
+    LOG.log(Level.FINEST, "{0} has CommGroupHandler-{1}",
+        new Object[]{operName, commGroupNetworkHandler});
+
+    this.version = version;
+    this.groupName = Utils.getClass(groupName);
+    this.operName = Utils.getClass(operName);
+    this.dataCodec = dataCodec;
+    this.reduceFunction = reduceFunction;
+    this.commGroupNetworkHandler = commGroupNetworkHandler;
+    this.netService = netService;
+    this.sender = new Sender(this.netService);
+    this.topology = new OperatorTopologyImpl(this.groupName, this.operName, 
selfId, driverId, sender, version);
+    this.commGroupNetworkHandler.register(this.operName, this);
+    this.commGroupClient = commGroupClient;
+  }
+
+  @Override
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public void initialize() throws ParentDeadException {
+    topology.initialize();
+  }
+
+  @Override
+  public Class<? extends Name<String>> getOperName() {
+    return operName;
+  }
+
+  @Override
+  public Class<? extends Name<String>> getGroupName() {
+    return groupName;
+  }
+
+  @Override
+  public String toString() {
+    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + 
":" + version;
+  }
+
+  @Override
+  public void onNext(final GroupCommunicationMessage msg) {
+    topology.handle(msg);
+  }
+
+  @Override
+  public void send(final T myData) throws NetworkException, 
InterruptedException {
+    LOG.entering("ReduceSender", "send", new Object[]{this, myData});
+    LOG.fine("I am " + this);
+
+    if (init.compareAndSet(false, true)) {
+      commGroupClient.initialize();
+    }
+    // I am an intermediate node or leaf.
+    LOG.finest("Waiting for children");
+    // Wait for children to send
+    try {
+      final T reducedValueOfChildren = 
topology.recvFromChildren(reduceFunction, dataCodec);
+      final List<T> vals = new ArrayList<>(2);
+      vals.add(myData);
+      if (reducedValueOfChildren != null) {
+        vals.add(reducedValueOfChildren);
+      }
+      final T reducedValue = reduceFunction.apply(vals);
+      LOG.fine(this + " Sending local " + reducedValue + " to parent");
+      topology.sendToParent(dataCodec.encode(reducedValue), 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce);
+    } catch (final ParentDeadException e) {
+      throw new RuntimeException("ParentDeadException", e);
+    }
+    LOG.exiting("ReduceSender", "send", Arrays.toString(new Object[]{this, 
myData}));
+  }
+
+  @Override
+  public ReduceFunction<T> getReduceFunction() {
+    return reduceFunction;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/Sender.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/Sender.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/Sender.java
new file mode 100644
index 0000000..b124906
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/Sender.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.operators;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import 
org.apache.reef.io.network.group.api.operators.AbstractGroupCommOperator;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+
+import java.util.Arrays;
+import java.util.logging.Logger;
+
+public class Sender extends AbstractGroupCommOperator {
+
+  private static final Logger LOG = Logger.getLogger(Sender.class.getName());
+
+  private final NetworkService<GroupCommunicationMessage> netService;
+  private final IdentifierFactory idFac = new StringIdentifierFactory();
+
+  public Sender(final NetworkService<GroupCommunicationMessage> netService) {
+    this.netService = netService;
+  }
+
+  public void send(final GroupCommunicationMessage msg) throws 
NetworkException {
+    LOG.entering("Sender", "send", msg);
+    final String dest = msg.getDestid();
+    send(msg, dest);
+    LOG.exiting("Sender", "send", msg);
+  }
+
+  public void send(final GroupCommunicationMessage msg, final String dest) 
throws NetworkException {
+    LOG.entering("Sender", "send", new Object[]{msg, dest});
+    final Identifier destId = idFac.getNewInstance(dest);
+    final Connection<GroupCommunicationMessage> link = 
netService.newConnection(destId);
+    link.open();
+    link.write(msg);
+    LOG.exiting("Sender", "send", Arrays.toString(new Object[]{msg, dest}));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ChildNodeStruct.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ChildNodeStruct.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ChildNodeStruct.java
new file mode 100644
index 0000000..b69ec5d
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ChildNodeStruct.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.task;
+
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+
+import java.util.logging.Logger;
+
+public class ChildNodeStruct extends NodeStructImpl {
+
+  private static final Logger LOG = 
Logger.getLogger(ChildNodeStruct.class.getName());
+
+  public ChildNodeStruct(final String id, final int version) {
+    super(id, version);
+  }
+
+  @Override
+  public boolean checkDead(final GroupCommunicationMessage gcm) {
+    LOG.entering("ChildNodeStruct", "checkDead", gcm);
+    final boolean retVal = gcm.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead ? true : false;
+    LOG.exiting("ChildNodeStruct", "checkDead", gcm);
+    return retVal;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java
new file mode 100644
index 0000000..72af1ec
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.task;
+
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+public class CommGroupNetworkHandlerImpl implements
+    CommGroupNetworkHandler {
+
+  private static final Logger LOG = 
Logger.getLogger(CommGroupNetworkHandlerImpl.class.getName());
+
+  private final Map<Class<? extends Name<String>>, 
EventHandler<GroupCommunicationMessage>> operHandlers = new 
ConcurrentHashMap<>();
+  private final Map<Class<? extends Name<String>>, 
BlockingQueue<GroupCommunicationMessage>> topologyNotifications = new 
ConcurrentHashMap<>();
+
+  @Inject
+  public CommGroupNetworkHandlerImpl() {
+  }
+
+  @Override
+  public void register(final Class<? extends Name<String>> operName,
+                       final EventHandler<GroupCommunicationMessage> 
operHandler) {
+    LOG.entering("CommGroupNetworkHandlerImpl", "register", new 
Object[]{Utils.simpleName(operName), operHandler});
+    operHandlers.put(operName, operHandler);
+    LOG.exiting("CommGroupNetworkHandlerImpl", "register", Arrays.toString(new 
Object[]{Utils.simpleName(operName), operHandler}));
+  }
+
+  @Override
+  public void addTopologyElement(final Class<? extends Name<String>> operName) 
{
+    LOG.entering("CommGroupNetworkHandlerImpl", "addTopologyElement", 
Utils.simpleName(operName));
+    LOG.finest("Creating LBQ for " + operName);
+    topologyNotifications.put(operName, new 
LinkedBlockingQueue<GroupCommunicationMessage>());
+    LOG.exiting("CommGroupNetworkHandlerImpl", "addTopologyElement", 
Utils.simpleName(operName));
+  }
+
+  @Override
+  public void onNext(final GroupCommunicationMessage msg) {
+    LOG.entering("CommGroupNetworkHandlerImpl", "onNext", msg);
+    final Class<? extends Name<String>> operName = 
Utils.getClass(msg.getOperatorname());
+    if (msg.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated || 
msg.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges) {
+      topologyNotifications.get(operName).add(msg);
+    } else {
+      operHandlers.get(operName).onNext(msg);
+    }
+    LOG.exiting("CommGroupNetworkHandlerImpl", "onNext", msg);
+  }
+
+  @Override
+  public byte[] waitForTopologyChanges(final Class<? extends Name<String>> 
operName) {
+    LOG.entering("CommGroupNetworkHandlerImpl", "waitForTopologyChanges", 
Utils.simpleName(operName));
+    try {
+      final byte[] retVal = 
Utils.getData(topologyNotifications.get(operName).take());
+      LOG.exiting("CommGroupNetworkHandlerImpl", "waitForTopologyChanges", 
retVal);
+      return retVal;
+    } catch (final InterruptedException e) {
+      throw new RuntimeException("InterruptedException while waiting for 
topology update of "
+          + operName.getSimpleName(), e);
+    }
+  }
+
+  @Override
+  public GroupCommunicationMessage waitForTopologyUpdate(final Class<? extends 
Name<String>> operName) {
+    LOG.entering("CommGroupNetworkHandlerImpl", "waitForTopologyUpdate", 
Utils.simpleName(operName));
+    try {
+      final GroupCommunicationMessage retVal = 
topologyNotifications.get(operName).take();
+      LOG.exiting("CommGroupNetworkHandlerImpl", "waitForTopologyUpdate", 
retVal);
+      return retVal;
+    } catch (final InterruptedException e) {
+      throw new RuntimeException("InterruptedException while waiting for 
topology update of "
+          + operName.getSimpleName(), e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
new file mode 100644
index 0000000..d0e6e9e
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.task;
+
+import org.apache.reef.driver.parameters.DriverIdentifier;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.group.api.operators.GroupCommOperator;
+import org.apache.reef.io.network.group.api.operators.Reduce;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.group.api.GroupChanges;
+import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
+import 
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.GroupCommNetworkHandler;
+import org.apache.reef.io.network.group.impl.GroupChangesCodec;
+import org.apache.reef.io.network.group.impl.GroupChangesImpl;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import 
org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName;
+import org.apache.reef.io.network.group.impl.config.parameters.OperatorName;
+import 
org.apache.reef.io.network.group.impl.config.parameters.SerializedOperConfigs;
+import org.apache.reef.io.network.group.impl.operators.Sender;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.impl.ThreadPoolStage;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceClient {
+  private static final Logger LOG = 
Logger.getLogger(CommunicationGroupClientImpl.class.getName());
+
+  private final GroupCommNetworkHandler groupCommNetworkHandler;
+  private final Class<? extends Name<String>> groupName;
+  private final Map<Class<? extends Name<String>>, GroupCommOperator> 
operators;
+  private final Sender sender;
+
+  private final String taskId;
+
+  private final String driverId;
+
+  private final CommGroupNetworkHandler commGroupNetworkHandler;
+
+  private final AtomicBoolean init = new AtomicBoolean(false);
+
+  @Inject
+  public CommunicationGroupClientImpl(@Parameter(CommunicationGroupName.class) 
final String groupName,
+                                      
@Parameter(TaskConfigurationOptions.Identifier.class) final String taskId,
+                                      @Parameter(DriverIdentifier.class) final 
String driverId,
+                                      final GroupCommNetworkHandler 
groupCommNetworkHandler,
+                                      @Parameter(SerializedOperConfigs.class) 
final Set<String> operatorConfigs,
+                                      final ConfigurationSerializer 
configSerializer,
+                                      final 
NetworkService<GroupCommunicationMessage> netService) {
+    this.taskId = taskId;
+    this.driverId = driverId;
+    LOG.finest(groupName + " has GroupCommHandler-" + 
groupCommNetworkHandler.toString());
+    this.groupName = Utils.getClass(groupName);
+    this.groupCommNetworkHandler = groupCommNetworkHandler;
+    this.sender = new Sender(netService);
+    this.operators = new TreeMap<>(new Comparator<Class<? extends 
Name<String>>>() {
+
+      @Override
+      public int compare(final Class<? extends Name<String>> o1, final Class<? 
extends Name<String>> o2) {
+        final String s1 = o1.getSimpleName();
+        final String s2 = o2.getSimpleName();
+        return s1.compareTo(s2);
+      }
+    });
+    try {
+      this.commGroupNetworkHandler = 
Tang.Factory.getTang().newInjector().getInstance(CommGroupNetworkHandler.class);
+      this.groupCommNetworkHandler.register(this.groupName, 
commGroupNetworkHandler);
+
+      for (final String operatorConfigStr : operatorConfigs) {
+
+        final Configuration operatorConfig = 
configSerializer.fromString(operatorConfigStr);
+        final Injector injector = 
Tang.Factory.getTang().newInjector(operatorConfig);
+
+        
injector.bindVolatileParameter(TaskConfigurationOptions.Identifier.class, 
taskId);
+        injector.bindVolatileParameter(CommunicationGroupName.class, 
groupName);
+        injector.bindVolatileInstance(CommGroupNetworkHandler.class, 
commGroupNetworkHandler);
+        injector.bindVolatileInstance(NetworkService.class, netService);
+        injector.bindVolatileInstance(CommunicationGroupServiceClient.class, 
this);
+
+        final GroupCommOperator operator = 
injector.getInstance(GroupCommOperator.class);
+        final String operName = injector.getNamedInstance(OperatorName.class);
+        this.operators.put(Utils.getClass(operName), operator);
+        LOG.finest(operName + " has CommGroupHandler-" + 
commGroupNetworkHandler.toString());
+      }
+    } catch (final InjectionException | IOException e) {
+      throw new RuntimeException("Unable to deserialize operator config", e);
+    }
+  }
+
+  @Override
+  public Broadcast.Sender getBroadcastSender(final Class<? extends 
Name<String>> operatorName) {
+    LOG.entering("CommunicationGroupClientImpl", "getBroadcastSender", new 
Object[]{getQualifiedName(),
+        Utils.simpleName(operatorName)});
+    final GroupCommOperator op = operators.get(operatorName);
+    if (!(op instanceof Broadcast.Sender)) {
+      throw new RuntimeException("Configured operator is not a broadcast 
sender");
+    }
+    commGroupNetworkHandler.addTopologyElement(operatorName);
+    LOG.exiting("CommunicationGroupClientImpl", "getBroadcastSender", 
getQualifiedName() + op);
+    return (Broadcast.Sender) op;
+  }
+
+  @Override
+  public Reduce.Receiver getReduceReceiver(final Class<? extends Name<String>> 
operatorName) {
+    LOG.entering("CommunicationGroupClientImpl", "getReduceReceiver", new 
Object[]{getQualifiedName(),
+        Utils.simpleName(operatorName)});
+    final GroupCommOperator op = operators.get(operatorName);
+    if (!(op instanceof Reduce.Receiver)) {
+      throw new RuntimeException("Configured operator is not a reduce 
receiver");
+    }
+    commGroupNetworkHandler.addTopologyElement(operatorName);
+    LOG.exiting("CommunicationGroupClientImpl", "getReduceReceiver", 
getQualifiedName() + op);
+    return (Reduce.Receiver) op;
+  }
+
+  @Override
+  public Broadcast.Receiver getBroadcastReceiver(final Class<? extends 
Name<String>> operatorName) {
+    LOG.entering("CommunicationGroupClientImpl", "getBroadcastReceiver", new 
Object[]{getQualifiedName(),
+        Utils.simpleName(operatorName)});
+    final GroupCommOperator op = operators.get(operatorName);
+    if (!(op instanceof Broadcast.Receiver)) {
+      throw new RuntimeException("Configured operator is not a broadcast 
receiver");
+    }
+    commGroupNetworkHandler.addTopologyElement(operatorName);
+    LOG.exiting("CommunicationGroupClientImpl", "getBroadcastReceiver", 
getQualifiedName() + op);
+    return (Broadcast.Receiver) op;
+  }
+
+  @Override
+  public Reduce.Sender getReduceSender(final Class<? extends Name<String>> 
operatorName) {
+    LOG.entering("CommunicationGroupClientImpl", "getReduceSender", new 
Object[]{getQualifiedName(),
+        Utils.simpleName(operatorName)});
+    final GroupCommOperator op = operators.get(operatorName);
+    if (!(op instanceof Reduce.Sender)) {
+      throw new RuntimeException("Configured operator is not a reduce sender");
+    }
+    commGroupNetworkHandler.addTopologyElement(operatorName);
+    LOG.exiting("CommunicationGroupClientImpl", "getReduceSender", 
getQualifiedName() + op);
+    return (Reduce.Sender) op;
+  }
+
+  @Override
+  public void initialize() {
+    LOG.entering("CommunicationGroupClientImpl", "initialize", 
getQualifiedName());
+    if (init.compareAndSet(false, true)) {
+      LOG.finest("CommGroup-" + groupName + " is initializing");
+      final CountDownLatch initLatch = new CountDownLatch(operators.size());
+
+      final InitHandler initHandler = new InitHandler(initLatch);
+      final EStage<GroupCommOperator> initStage = new 
ThreadPoolStage<>(initHandler, operators.size());
+      for (final GroupCommOperator op : operators.values()) {
+        initStage.onNext(op);
+      }
+
+      try {
+        initLatch.await();
+      } catch (final InterruptedException e) {
+        throw new RuntimeException("InterruptedException while waiting for 
initialization", e);
+      }
+
+      if (initHandler.getException() != null) {
+        throw new RuntimeException(getQualifiedName() + "Parent dead. Current 
behavior is for the child to die too.");
+      }
+    }
+    LOG.exiting("CommunicationGroupClientImpl", "initialize", 
getQualifiedName());
+  }
+
+  @Override
+  public GroupChanges getTopologyChanges() {
+    LOG.entering("CommunicationGroupClientImpl", "getTopologyChanges", 
getQualifiedName());
+    for (final GroupCommOperator op : operators.values()) {
+      final Class<? extends Name<String>> operName = op.getOperName();
+      LOG.finest("Sending TopologyChanges msg to driver");
+      try {
+        sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, taskId, 
op.getVersion(), driverId,
+            0, Utils.EmptyByteArr));
+      } catch (final NetworkException e) {
+        throw new RuntimeException("NetworkException while sending 
GetTopologyChanges", e);
+      }
+    }
+    final Codec<GroupChanges> changesCodec = new GroupChangesCodec();
+    final Map<Class<? extends Name<String>>, GroupChanges> perOpChanges = new 
HashMap<>();
+    for (final GroupCommOperator op : operators.values()) {
+      final Class<? extends Name<String>> operName = op.getOperName();
+      final byte[] changes = 
commGroupNetworkHandler.waitForTopologyChanges(operName);
+      perOpChanges.put(operName, changesCodec.decode(changes));
+    }
+    final GroupChanges retVal = mergeGroupChanges(perOpChanges);
+    LOG.exiting("CommunicationGroupClientImpl", "getTopologyChanges", 
getQualifiedName() + retVal);
+    return retVal;
+  }
+
+  /**
+   * @param perOpChanges
+   * @return
+   */
+  private GroupChanges mergeGroupChanges(final Map<Class<? extends 
Name<String>>, GroupChanges> perOpChanges) {
+    LOG.entering("CommunicationGroupClientImpl", "mergeGroupChanges", new 
Object[]{getQualifiedName(), perOpChanges});
+    boolean doChangesExist = false;
+    for (final GroupChanges change : perOpChanges.values()) {
+      if (change.exist()) {
+        doChangesExist = true;
+        break;
+      }
+    }
+    final GroupChanges changes = new GroupChangesImpl(doChangesExist);
+    LOG.exiting("CommunicationGroupClientImpl", "mergeGroupChanges", 
getQualifiedName() + changes);
+    return changes;
+  }
+
+  @Override
+  public void updateTopology() {
+    LOG.entering("CommunicationGroupClientImpl", "updateTopology", 
getQualifiedName());
+    for (final GroupCommOperator op : operators.values()) {
+      final Class<? extends Name<String>> operName = op.getOperName();
+      try {
+        sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, taskId, 
op.getVersion(), driverId,
+            0, Utils.EmptyByteArr));
+      } catch (final NetworkException e) {
+        throw new RuntimeException("NetworkException while sending 
UpdateTopology", e);
+      }
+    }
+    for (final GroupCommOperator op : operators.values()) {
+      final Class<? extends Name<String>> operName = op.getOperName();
+      GroupCommunicationMessage msg;
+      do {
+        msg = commGroupNetworkHandler.waitForTopologyUpdate(operName);
+      } while (!isMsgVersionOk(msg));
+    }
+    LOG.exiting("CommunicationGroupClientImpl", "updateTopology", 
getQualifiedName());
+  }
+
+  private boolean isMsgVersionOk(final GroupCommunicationMessage msg) {
+    LOG.entering("CommunicationGroupClientImpl", "isMsgVersionOk", new 
Object[]{getQualifiedName(), msg});
+    if (msg.hasVersion()) {
+      final int msgVersion = msg.getVersion();
+      final GroupCommOperator operator = 
operators.get(Utils.getClass(msg.getOperatorname()));
+      final int nodeVersion = operator.getVersion();
+      final boolean retVal;
+      if (msgVersion < nodeVersion) {
+        LOG.warning(getQualifiedName() + "Received a ver-" + msgVersion + " 
msg while expecting ver-" + nodeVersion
+            + ". Discarding msg");
+        retVal = false;
+      } else {
+        retVal = true;
+      }
+      LOG.exiting("CommunicationGroupClientImpl", "isMsgVersionOk", 
Arrays.toString(new Object[]{retVal, getQualifiedName(), msg}));
+      return retVal;
+    } else {
+      throw new RuntimeException(getQualifiedName() + "can only deal with 
versioned msgs");
+    }
+  }
+
+  private String getQualifiedName() {
+    return Utils.simpleName(groupName) + " ";
+  }
+
+  @Override
+  public Class<? extends Name<String>> getName() {
+    return groupName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
new file mode 100644
index 0000000..f8f6db4
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.task;
+
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.group.api.task.CommunicationGroupClient;
+import 
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.api.task.GroupCommClient;
+import org.apache.reef.io.network.group.api.task.GroupCommNetworkHandler;
+import 
org.apache.reef.io.network.group.impl.config.parameters.SerializedGroupConfigs;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class GroupCommClientImpl implements GroupCommClient {
+  private static final Logger LOG = 
Logger.getLogger(GroupCommClientImpl.class.getName());
+
+  private final Map<Class<? extends Name<String>>, 
CommunicationGroupServiceClient> communicationGroups = new HashMap<>();
+
+  @Inject
+  public GroupCommClientImpl(
+      final @Parameter(SerializedGroupConfigs.class) Set<String> groupConfigs,
+      final @Parameter(TaskConfigurationOptions.Identifier.class) String 
taskId,
+      final GroupCommNetworkHandler groupCommNetworkHandler,
+      final NetworkService<ReefNetworkGroupCommProtos.GroupCommMessage> 
netService,
+      final ConfigurationSerializer configSerializer) {
+
+    LOG.log(Level.FINEST, "GroupCommHandler-{0}", groupCommNetworkHandler);
+
+    for (final String groupConfigStr : groupConfigs) {
+      try {
+        final Configuration groupConfig = 
configSerializer.fromString(groupConfigStr);
+
+        final Injector injector = 
Tang.Factory.getTang().newInjector(groupConfig);
+        
injector.bindVolatileParameter(TaskConfigurationOptions.Identifier.class, 
taskId);
+        injector.bindVolatileInstance(GroupCommNetworkHandler.class, 
groupCommNetworkHandler);
+        injector.bindVolatileInstance(NetworkService.class, netService);
+
+        final CommunicationGroupServiceClient commGroupClient =
+            injector.getInstance(CommunicationGroupServiceClient.class);
+
+        this.communicationGroups.put(commGroupClient.getName(), 
commGroupClient);
+
+      } catch (final InjectionException | IOException e) {
+        throw new RuntimeException("Unable to deserialize operator config", e);
+      }
+    }
+  }
+
+  @Override
+  public CommunicationGroupClient getCommunicationGroup(
+      final Class<? extends Name<String>> groupName) {
+    return communicationGroups.get(groupName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java
new file mode 100644
index 0000000..a1e9277
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.task;
+
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.group.api.task.GroupCommNetworkHandler;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Logger;
+
+public class GroupCommNetworkHandlerImpl implements GroupCommNetworkHandler {
+
+  private static final Logger LOG = 
Logger.getLogger(GroupCommNetworkHandlerImpl.class.getName());
+
+  private final Map<Class<? extends Name<String>>, 
EventHandler<GroupCommunicationMessage>> commGroupHandlers = new 
ConcurrentHashMap<>();
+
+  @Inject
+  public GroupCommNetworkHandlerImpl() {
+  }
+
+  @Override
+  public void onNext(final Message<GroupCommunicationMessage> mesg) {
+    LOG.entering("GroupCommNetworkHandlerImpl", "onNext", mesg);
+    final Iterator<GroupCommunicationMessage> iter = mesg.getData().iterator();
+    final GroupCommunicationMessage msg = iter.hasNext() ? iter.next() : null;
+    try {
+      final Class<? extends Name<String>> groupName = (Class<? extends 
Name<String>>) Class.forName(msg.getGroupname());
+      commGroupHandlers.get(groupName).onNext(msg);
+    } catch (final ClassNotFoundException e) {
+      throw new RuntimeException("GroupName not found", e);
+    }
+    LOG.exiting("GroupCommNetworkHandlerImpl", "onNext", mesg);
+  }
+
+  @Override
+  public void register(final Class<? extends Name<String>> groupName,
+                       final EventHandler<GroupCommunicationMessage> 
commGroupNetworkHandler) {
+    LOG.entering("GroupCommNetworkHandlerImpl", "register", new 
Object[]{groupName,
+        commGroupNetworkHandler});
+    commGroupHandlers.put(groupName, commGroupNetworkHandler);
+    LOG.exiting("GroupCommNetworkHandlerImpl", "register", Arrays.toString(new 
Object[]{groupName,
+        commGroupNetworkHandler}));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/InitHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/InitHandler.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/InitHandler.java
new file mode 100644
index 0000000..13bd644
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/InitHandler.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.task;
+
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.GroupCommOperator;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.logging.Logger;
+
+class InitHandler implements EventHandler<GroupCommOperator> {
+
+  private static final Logger LOG = 
Logger.getLogger(InitHandler.class.getName());
+
+  private ParentDeadException exception = null;
+  private final CountDownLatch initLatch;
+
+  public InitHandler(final CountDownLatch initLatch) {
+    this.initLatch = initLatch;
+  }
+
+  @Override
+  public void onNext(final GroupCommOperator op) {
+    LOG.entering("InitHandler", "onNext", op);
+    try {
+      op.initialize();
+    } catch (final ParentDeadException e) {
+      this.exception = e;
+    }
+    initLatch.countDown();
+    LOG.exiting("InitHandler", "onNext", op);
+  }
+
+  public ParentDeadException getException() {
+    return exception;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/NodeStructImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/NodeStructImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/NodeStructImpl.java
new file mode 100644
index 0000000..5976352
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/NodeStructImpl.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.task;
+
+import org.apache.reef.io.network.group.api.task.NodeStruct;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+public abstract class NodeStructImpl implements NodeStruct {
+
+  private static final Logger LOG = 
Logger.getLogger(NodeStructImpl.class.getName());
+
+  private final String id;
+  private final BlockingQueue<GroupCommunicationMessage> dataQue = new 
LinkedBlockingQueue<>();
+
+  private int version;
+
+  public NodeStructImpl(final String id, final int version) {
+    super();
+    this.id = id;
+    this.version = version;
+  }
+
+  @Override
+  public int getVersion() {
+    return version;
+  }
+
+  @Override
+  public void setVersion(final int version) {
+    this.version = version;
+  }
+
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public void addData(final GroupCommunicationMessage msg) {
+    LOG.entering("NodeStructImpl", "addData", msg);
+    dataQue.add(msg);
+    LOG.exiting("NodeStructImpl", "addData", msg);
+  }
+
+  @Override
+  public byte[] getData() {
+    LOG.entering("NodeStructImpl", "getData");
+    GroupCommunicationMessage gcm;
+    try {
+      gcm = dataQue.take();
+    } catch (final InterruptedException e) {
+      throw new RuntimeException("InterruptedException while waiting for data 
from " + id, e);
+    }
+
+    final byte[] retVal = checkDead(gcm) ? null : Utils.getData(gcm);
+    LOG.exiting("NodeStructImpl", "getData", retVal);
+    return retVal;
+  }
+
+  @Override
+  public String toString() {
+    return "(" + id + "," + version + ")";
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (obj instanceof NodeStructImpl) {
+      final NodeStructImpl that = (NodeStructImpl) obj;
+      return this.id.equals(that.id) && this.version == that.version;
+    } else {
+      return false;
+    }
+  }
+
+  public abstract boolean checkDead(final GroupCommunicationMessage gcm);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
new file mode 100644
index 0000000..f46031b
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.task;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.io.network.group.api.operators.Reduce;
+import org.apache.reef.io.network.group.api.task.OperatorTopology;
+import org.apache.reef.io.network.group.api.task.OperatorTopologyStruct;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.io.network.group.impl.operators.Sender;
+import org.apache.reef.io.network.group.impl.utils.ResettingCountDownLatch;
+import org.apache.reef.io.network.group.impl.utils.Utils;
+import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SingleThreadStage;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+public class OperatorTopologyImpl implements OperatorTopology {
+
+  private static final Logger LOG = 
Logger.getLogger(OperatorTopologyImpl.class.getName());
+
+  private final Class<? extends Name<String>> groupName;
+  private final Class<? extends Name<String>> operName;
+  private final String selfId;
+  private final String driverId;
+  private final Sender sender;
+  private final Object topologyLock = new Object();
+
+  private final int version;
+
+  private final BlockingQueue<GroupCommunicationMessage> deltas = new 
LinkedBlockingQueue<>();
+  private final BlockingQueue<GroupCommunicationMessage> deletionDeltas = new 
LinkedBlockingQueue<>();
+
+  private OperatorTopologyStruct baseTopology;
+  private OperatorTopologyStruct effectiveTopology;
+  private final ResettingCountDownLatch topologyLockAquired = new 
ResettingCountDownLatch(1);
+  private final AtomicBoolean updatingTopo = new AtomicBoolean(false);
+
+  private final EventHandler<GroupCommunicationMessage> 
baseTopologyUpdateHandler = new BaseTopologyUpdateHandler();
+
+  private final EStage<GroupCommunicationMessage> baseTopologyUpdateStage = 
new SingleThreadStage<>(
+      "BaseTopologyUpdateStage",
+      baseTopologyUpdateHandler,
+      5);
+
+  private final EventHandler<GroupCommunicationMessage> 
dataHandlingStageHandler = new DataHandlingStageHandler();
+
+  // The queue capacity might determine how many tasks can be handled
+  private final EStage<GroupCommunicationMessage> dataHandlingStage = new 
SingleThreadStage<>("DataHandlingStage",
+      dataHandlingStageHandler,
+      10000);
+
+  @Inject
+  public OperatorTopologyImpl(final Class<? extends Name<String>> groupName,
+                              final Class<? extends Name<String>> operName, 
final String selfId,
+                              final String driverId, final Sender sender, 
final int version) {
+    super();
+    this.groupName = groupName;
+    this.operName = operName;
+    this.selfId = selfId;
+    this.driverId = driverId;
+    this.sender = sender;
+    this.version = version;
+  }
+
+  /**
+   * Handle messages meant for this operator. Data msgs are passed on
+   * to the DataHandlingStage while Ctrl msgs are queued up for the
+   * base topology to update later. Ctrl msgs signalling death of a
+   * task are also routed to the effectiveTopology in order to notify
+   * a waiting operation. During initialization when effective topology
+   * is not yet set-up, these *Dead msgs are queued in deletionDeltas
+   * for the small time window when these arrive after baseTopology has
+   * received TopologySetup but not yet created the effectiveTopology.
+   * Most times the msgs in the deletionDeltas will be discarded as stale
+   * msgs
+   * <p/>
+   * No synchronization is needed while handling *Dead messages.
+   * There 2 states: UpdatingTopo & NotUpdatingTopo
+   * If UpdatingTopo, deltas.put still takes care of adding this msg to effTop 
through baseTopo changes.
+   * If not, we add to effTopo. So we are good.
+   * <p/>
+   * However, for data msgs synchronization is needed. Look at doc of
+   * DataHandlingStage
+   * <p/>
+   * Adding to deletionDeltas should be outside
+   * effTopo!=null block. There is a rare possibility that during 
initialization
+   * just after baseTopo is created(so deltas will be ignored) & just before
+   * effTopo is created(so effTopo will be null) where we can miss a deletion
+   * msg if not added to deletionDelta because this method is synchronized
+   */
+  @Override
+  public void handle(final GroupCommunicationMessage msg) {
+    LOG.entering("OperatorTopologyImpl", "handle", new 
Object[]{getQualifiedName(), msg});
+    if (isMsgVersionOk(msg)) {
+      try {
+        switch (msg.getType()) {
+          case UpdateTopology:
+            updatingTopo.set(true);
+            baseTopologyUpdateStage.onNext(msg);
+            topologyLockAquired.awaitAndReset(1);
+            LOG.finest(getQualifiedName() + "topoLockAquired CDL released. 
Resetting it to new CDL");
+            sendAckToDriver(msg);
+            break;
+
+          case TopologySetup:
+            LOG.finest(getQualifiedName() + "Adding to deltas queue");
+            deltas.put(msg);
+            break;
+
+          case ParentAdd:
+          case ChildAdd:
+            LOG.finest(getQualifiedName() + "Adding to deltas queue");
+            deltas.put(msg);
+            break;
+
+          case ParentDead:
+          case ChildDead:
+            LOG.finest(getQualifiedName() + "Adding to deltas queue");
+            deltas.put(msg);
+
+            LOG.finest(getQualifiedName() + "Adding to deletionDeltas queue");
+            deletionDeltas.put(msg);
+
+            if (effectiveTopology != null) {
+              LOG.finest(getQualifiedName() + "Adding as data msg to non-null 
effective topology struct");
+              effectiveTopology.addAsData(msg);
+            } else {
+              LOG.fine(getQualifiedName() + "Received a death message before 
effective topology was setup. CAUTION");
+            }
+            break;
+
+          default:
+            dataHandlingStage.onNext(msg);
+        }
+      } catch (final InterruptedException e) {
+        throw new RuntimeException("InterruptedException while trying to put 
ctrl msg into delta queue", e);
+      }
+    }
+    LOG.exiting("OperatorTopologyImpl", "handle", Arrays.toString(new 
Object[]{getQualifiedName(), msg}));
+  }
+
+  private boolean isMsgVersionOk(final GroupCommunicationMessage msg) {
+    LOG.entering("OperatorTopologyImpl", "isMsgVersionOk", new 
Object[]{getQualifiedName(), msg});
+    if (msg.hasVersion()) {
+      final int msgVersion = msg.getVersion();
+      final boolean retVal;
+      if (msgVersion < version) {
+        LOG.warning(getQualifiedName() + "Received a ver-" + msgVersion + " 
msg while expecting ver-" + version
+            + ". Discarding msg");
+        retVal = false;
+      } else {
+        retVal = true;
+      }
+      LOG.exiting("OperatorTopologyImpl", "isMsgVersionOk", 
Arrays.toString(new Object[]{retVal, getQualifiedName(), msg}));
+      return retVal;
+    } else {
+      throw new RuntimeException(getQualifiedName() + "can only deal with 
versioned msgs");
+    }
+  }
+
+  @Override
+  public void initialize() throws ParentDeadException {
+    LOG.entering("OperatorTopologyImpl", "initialize", getQualifiedName());
+    createBaseTopology();
+    LOG.exiting("OperatorTopologyImpl", "initialize", getQualifiedName());
+  }
+
+  @Override
+  public void sendToParent(final byte[] data, final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws 
ParentDeadException {
+    LOG.entering("OperatorTopologyImpl", "sendToParent", new 
Object[]{getQualifiedName(), data, msgType});
+    refreshEffectiveTopology();
+    assert (effectiveTopology != null);
+    effectiveTopology.sendToParent(data, msgType);
+    LOG.exiting("OperatorTopologyImpl", "sendToParent", Arrays.toString(new 
Object[]{getQualifiedName(), data, msgType}));
+  }
+
+  @Override
+  public void sendToChildren(final byte[] data, final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws 
ParentDeadException {
+    LOG.entering("OperatorTopologyImpl", "sendToChildren", new 
Object[]{getQualifiedName(), data, msgType});
+    refreshEffectiveTopology();
+    assert (effectiveTopology != null);
+    effectiveTopology.sendToChildren(data, msgType);
+    LOG.exiting("OperatorTopologyImpl", "sendToChildren", Arrays.toString(new 
Object[]{getQualifiedName(), data, msgType}));
+  }
+
+  @Override
+  public byte[] recvFromParent() throws ParentDeadException {
+    LOG.entering("OperatorTopologyImpl", "recvFromParent", getQualifiedName());
+    refreshEffectiveTopology();
+    assert (effectiveTopology != null);
+    final byte[] retVal = effectiveTopology.recvFromParent();
+    LOG.exiting("OperatorTopologyImpl", "recvFromParent", Arrays.toString(new 
Object[]{getQualifiedName(), retVal}));
+    return retVal;
+  }
+
+  @Override
+  public <T> T recvFromChildren(final Reduce.ReduceFunction<T> redFunc, final 
Codec<T> dataCodec) throws ParentDeadException {
+    LOG.entering("OperatorTopologyImpl", "recvFromChildren", 
getQualifiedName());
+    refreshEffectiveTopology();
+    assert (effectiveTopology != null);
+    final T retVal = effectiveTopology.recvFromChildren(redFunc, dataCodec);
+    LOG.exiting("OperatorTopologyImpl", "recvFromChildren", 
Arrays.toString(new Object[]{getQualifiedName(), retVal}));
+    return retVal;
+  }
+
+  /**
+   * Only refreshes the effective topology with deletion msgs from
+   * deletionDeltas queue
+   *
+   * @throws ParentDeadException
+   */
+  private void refreshEffectiveTopology() throws ParentDeadException {
+    LOG.entering("OperatorTopologyImpl", "refreshEffectiveTopology", 
getQualifiedName());
+    LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
+    synchronized (topologyLock) {
+      LOG.finest(getQualifiedName() + "Acquired topoLock");
+
+      assert (effectiveTopology != null);
+
+      final Set<GroupCommunicationMessage> deletionDeltas = new HashSet<>();
+      copyDeletionDeltas(deletionDeltas);
+
+      LOG.finest(getQualifiedName() + "Updating effective topology struct with 
deletion msgs");
+      effectiveTopology.update(deletionDeltas);
+      LOG.finest(getQualifiedName() + "Released topoLock");
+    }
+    LOG.exiting("OperatorTopologyImpl", "refreshEffectiveTopology", 
getQualifiedName());
+  }
+
+  /**
+   * @throws ParentDeadException
+   */
+  private void createBaseTopology() throws ParentDeadException {
+    LOG.entering("OperatorTopologyImpl", "createBaseTopology", 
getQualifiedName());
+    baseTopology = new OperatorTopologyStructImpl(groupName, operName, selfId, 
driverId, sender, version);
+    updateBaseTopology();
+    LOG.exiting("OperatorTopologyImpl", "createBaseTopology", 
getQualifiedName());
+  }
+
+  /**
+   * Blocking method that waits till the base topology is updated Unblocks when
+   * we receive a TopologySetup msg from driver
+   * <p/>
+   * Will also update the effective topology when the base topology is updated
+   * so that creation of effective topology is limited to just this method and
+   * refresh will only refresh the effective topology with deletion msgs from
+   * deletionDeltas queue
+   *
+   * @throws ParentDeadException
+   */
+  private void updateBaseTopology() throws ParentDeadException {
+    LOG.entering("OperatorTopologyImpl", "updateBaseTopology", 
getQualifiedName());
+    LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
+    synchronized (topologyLock) {
+      LOG.finest(getQualifiedName() + "Acquired topoLock");
+      try {
+        assert (baseTopology != null);
+        LOG.finest(getQualifiedName() + "Updating base topology. So setting 
dirty bit");
+        baseTopology.setChanges(true);
+
+        LOG.finest(getQualifiedName() + "Waiting for ctrl msgs");
+        for (GroupCommunicationMessage msg = deltas.take(); msg.getType() != 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup; msg = 
deltas.take()) {
+          LOG.finest(getQualifiedName() + "Got " + msg.getType() + " msg from 
" + msg.getSrcid());
+          if (effectiveTopology == null && msg.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) {
+            /**
+             * If effectiveTopology!=null, this method is being called from 
the BaseTopologyUpdateStage
+             * And exception thrown will be caught by uncaughtExceptionHandler 
leading to System.exit
+             */
+            LOG.finer(getQualifiedName() + "Throwing ParentDeadException");
+            throw new ParentDeadException(getQualifiedName()
+                + "Parent dead. Current behavior is for the child to die 
too.");
+          } else {
+            LOG.finest(getQualifiedName() + "Updating basetopology struct");
+            baseTopology.update(msg);
+            sendAckToDriver(msg);
+          }
+          LOG.finest(getQualifiedName() + "Waiting for ctrl msgs");
+        }
+
+        updateEffTopologyFromBaseTopology();
+
+      } catch (final InterruptedException e) {
+        throw new RuntimeException("InterruptedException while waiting for 
delta msg from driver", e);
+      }
+      LOG.finest(getQualifiedName() + "Released topoLock");
+    }
+    LOG.exiting("OperatorTopologyImpl", "updateBaseTopology", 
getQualifiedName());
+  }
+
+  private void sendAckToDriver(final GroupCommunicationMessage msg) {
+    LOG.entering("OperatorTopologyImpl", "sendAckToDriver", new 
Object[]{getQualifiedName(), msg});
+    try {
+      final String srcId = msg.getSrcid();
+      if (msg.hasVersion()) {
+        final int srcVersion = msg.getSrcVersion();
+        switch (msg.getType()) {
+          case UpdateTopology:
+            sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, selfId, 
this.version, driverId,
+                srcVersion, Utils.EmptyByteArr));
+            break;
+          case ParentAdd:
+            sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdded, selfId, 
this.version, srcId,
+                srcVersion, Utils.EmptyByteArr), driverId);
+            break;
+          case ParentDead:
+            sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentRemoved, selfId, 
this.version, srcId,
+                srcVersion, Utils.EmptyByteArr), driverId);
+            break;
+          case ChildAdd:
+            sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdded, selfId, 
this.version, srcId,
+                srcVersion, Utils.EmptyByteArr), driverId);
+            break;
+          case ChildDead:
+            sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildRemoved, selfId, 
this.version, srcId,
+                srcVersion, Utils.EmptyByteArr), driverId);
+            break;
+          default:
+            throw new RuntimeException("Received a non control message for 
acknowledgement");
+        }
+      } else {
+        throw new RuntimeException(getQualifiedName() + "Ack Sender can only 
deal with versioned msgs");
+      }
+    } catch (final NetworkException e) {
+      throw new RuntimeException("NetworkException while sending ack to driver 
for delta msg " + msg.getType(), e);
+    }
+    LOG.exiting("OperatorTopologyImpl", "sendAckToDriver", Arrays.toString(new 
Object[]{getQualifiedName(), msg}));
+  }
+
+  private void updateEffTopologyFromBaseTopology() {
+    LOG.entering("OperatorTopologyImpl", "updateEffTopologyFromBaseTopology", 
getQualifiedName());
+    assert (baseTopology != null);
+    LOG.finest(getQualifiedName() + "Updaing effective topology");
+    if (baseTopology.hasChanges()) {
+      //Create effectiveTopology from baseTopology
+      effectiveTopology = new OperatorTopologyStructImpl(baseTopology);
+      baseTopology.setChanges(false);
+    }
+    LOG.exiting("OperatorTopologyImpl", "updateEffTopologyFromBaseTopology", 
getQualifiedName());
+  }
+
+  /**
+   * @param deletionDeltasForUpdate
+   * @throws ParentDeadException
+   */
+  private void copyDeletionDeltas(final Set<GroupCommunicationMessage> 
deletionDeltasForUpdate)
+      throws ParentDeadException {
+    LOG.entering("OperatorTopologyImpl", "copyDeletionDeltas", new 
Object[]{getQualifiedName(),
+        deletionDeltasForUpdate});
+    this.deletionDeltas.drainTo(deletionDeltasForUpdate);
+    for (final GroupCommunicationMessage msg : deletionDeltasForUpdate) {
+      final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType = 
msg.getType();
+      if (msgType == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) {
+        throw new ParentDeadException(getQualifiedName() + "Parent dead. 
Current behavior is for the child to die too.");
+      }
+    }
+    LOG.exiting("OperatorTopologyImpl", "copyDeletionDeltas", 
Arrays.toString(new Object[]{getQualifiedName(),
+        deletionDeltasForUpdate}));
+  }
+
+  private String getQualifiedName() {
+    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + 
":" + selfId + ":ver(" + version + ") - ";
+  }
+
+  /**
+   * Unlike Dead msgs this needs to be synchronized because data msgs are not
+   * routed through the base topo changes So we need to make sure to wait for
+   * updateTopo to complete and for the new effective topo to take effect. 
Hence
+   * updatinTopo is set to false in refreshEffTopo. Also, since this is called
+   * from a netty IO thread, we need to create a stage to move the msgs from
+   * netty space to application space and release the netty threads. Otherwise
+   * weird deadlocks can happen Ex: Sent model to k nodes using broadcast. Send
+   * to K+1 th is waiting for ACK. The K nodes already compute their states and
+   * reduce send their results. If we haven't finished refreshEffTopo because 
of
+   * which updatingTopo is true, we can't add the new msgs if the #netty 
threads
+   * is k All k threads are waiting to add data. Single user thread that is
+   * waiting for ACK does not come around to refreshEffTopo and we are
+   * deadlocked because there aren't enough netty threads to dispatch msgs to
+   * the application. Hence the stage
+   */
+  private final class DataHandlingStageHandler implements 
EventHandler<GroupCommunicationMessage> {
+    @Override
+    public void onNext(final GroupCommunicationMessage dataMsg) {
+      LOG.entering("OperatorTopologyImpl.DataHandlingStageHandler", "onNext", 
new Object[]{getQualifiedName(),
+          dataMsg});
+      LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
+      synchronized (topologyLock) {
+        LOG.finest(getQualifiedName() + "Aqcuired topoLock");
+        while (updatingTopo.get()) {
+          try {
+            LOG.finest(getQualifiedName() + "Topology is being updated. 
Released topoLock, Waiting on topoLock");
+            topologyLock.wait();
+            LOG.finest(getQualifiedName() + "Aqcuired topoLock");
+          } catch (final InterruptedException e) {
+            throw new RuntimeException("InterruptedException while data 
handling"
+                + "stage was waiting for updatingTopo to become false", e);
+          }
+        }
+        if (effectiveTopology != null) {
+          LOG.finest(getQualifiedName() + "Non-null 
effectiveTopo.addAsData(msg)");
+          effectiveTopology.addAsData(dataMsg);
+        } else {
+          LOG.fine("Received a data message before effective topology was 
setup");
+        }
+        LOG.finest(getQualifiedName() + "Released topoLock");
+      }
+      LOG.exiting("OperatorTopologyImpl.DataHandlingStageHandler", "onNext",
+          Arrays.toString(new Object[]{getQualifiedName(), dataMsg}));
+    }
+  }
+
+  private final class BaseTopologyUpdateHandler implements 
EventHandler<GroupCommunicationMessage> {
+    @Override
+    public void onNext(final GroupCommunicationMessage msg) {
+      assert (msg.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology);
+      assert (effectiveTopology != null);
+      LOG.entering("OperatorTopologyImpl.BaseTopologyUpdateHandler", "onNext", 
new Object[]{getQualifiedName(), msg});
+      LOG.finest(getQualifiedName() + "Waiting to acquire topoLock");
+      synchronized (topologyLock) {
+        LOG.finest(getQualifiedName() + "Acquired topoLock");
+        LOG.finest(getQualifiedName() + "Releasing topoLoackAcquired CDL");
+        topologyLockAquired.countDown();
+        try {
+          updateBaseTopology();
+          LOG.finest(getQualifiedName() + "Completed updating base & effective 
topologies");
+        } catch (final ParentDeadException e) {
+          throw new RuntimeException(getQualifiedName() + 
"BaseTopologyUpdateStage: Unexpected ParentDeadException", e);
+        }
+        updatingTopo.set(false);
+        LOG.finest(getQualifiedName() + "Topology update complete. Notifying 
waiting threads");
+        topologyLock.notifyAll();
+        LOG.finest(getQualifiedName() + "Released topoLock");
+      }
+      LOG.exiting("OperatorTopologyImpl.BaseTopologyUpdateHandler", "onNext",
+          Arrays.toString(new Object[]{getQualifiedName(), msg}));
+    }
+  }
+}

Reply via email to