http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java
index bb0dbff..2d63541 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java
@@ -41,7 +41,8 @@ public class ParentDeadException extends Exception {
     super(message, cause);
   }
 
-  public ParentDeadException(final String message, final Throwable cause, 
final boolean enableSuppression, final boolean writableStackTrace) {
+  public ParentDeadException(final String message, final Throwable cause,
+                             final boolean enableSuppression, final boolean 
writableStackTrace) {
     super(message, cause, enableSuppression, writableStackTrace);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java
index 3272e57..71cf9b5 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/task/GroupCommNetworkHandler.java
@@ -34,5 +34,6 @@ import org.apache.reef.wake.EventHandler;
 @DefaultImplementation(value = GroupCommNetworkHandlerImpl.class)
 public interface GroupCommNetworkHandler extends 
EventHandler<Message<GroupCommunicationMessage>> {
 
-  void register(Class<? extends Name<String>> groupName, 
EventHandler<GroupCommunicationMessage> commGroupNetworkHandler);
+  void register(Class<? extends Name<String>> groupName,
+                EventHandler<GroupCommunicationMessage> 
commGroupNetworkHandler);
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java
index 03c7ec8..29c5cef 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/GroupCommunicationMessage.java
@@ -117,7 +117,8 @@ public class GroupCommunicationMessage {
 
   @Override
   public String toString() {
-    return "[" + msgType + " from " + getSource() + " to " + getDestination() 
+ " for " + simpleGroupName + ":" + simpleOperName + "]";
+    return "[" + msgType + " from " + getSource() + " to " + getDestination() 
+ " for " + simpleGroupName + ":" +
+        simpleOperName + "]";
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
index ed64c7e..91d6564 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/CommunicationGroupDriverImpl.java
@@ -113,7 +113,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
   @Override
   public CommunicationGroupDriver addBroadcast(final Class<? extends 
Name<String>> operatorName,
                                                final BroadcastOperatorSpec 
spec) {
-    LOG.entering("CommunicationGroupDriverImpl", "addBroadcast", new 
Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec});
+    LOG.entering("CommunicationGroupDriverImpl", "addBroadcast",
+        new Object[]{getQualifiedName(), Utils.simpleName(operatorName), 
spec});
     if (finalised) {
       throw new IllegalStateException("Can't add more operators to a finalised 
spec");
     }
@@ -122,14 +123,16 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
     topology.setRootTask(spec.getSenderId());
     topology.setOperatorSpecification(spec);
     topologies.put(operatorName, topology);
-    LOG.exiting("CommunicationGroupDriverImpl", "addBroadcast", 
Arrays.toString(new Object[]{getQualifiedName(), 
Utils.simpleName(operatorName), " added"}));
+    LOG.exiting("CommunicationGroupDriverImpl", "addBroadcast",
+        Arrays.toString(new Object[]{getQualifiedName(), 
Utils.simpleName(operatorName), " added"}));
     return this;
   }
 
   @Override
   public CommunicationGroupDriver addReduce(final Class<? extends 
Name<String>> operatorName,
                                             final ReduceOperatorSpec spec) {
-    LOG.entering("CommunicationGroupDriverImpl", "addReduce", new 
Object[]{getQualifiedName(), Utils.simpleName(operatorName), spec});
+    LOG.entering("CommunicationGroupDriverImpl", "addReduce",
+        new Object[]{getQualifiedName(), Utils.simpleName(operatorName), 
spec});
     if (finalised) {
       throw new IllegalStateException("Can't add more operators to a finalised 
spec");
     }
@@ -139,13 +142,15 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
     topology.setRootTask(spec.getReceiverId());
     topology.setOperatorSpecification(spec);
     topologies.put(operatorName, topology);
-    LOG.exiting("CommunicationGroupDriverImpl", "addReduce", 
Arrays.toString(new Object[]{getQualifiedName(), 
Utils.simpleName(operatorName), " added"}));
+    LOG.exiting("CommunicationGroupDriverImpl", "addReduce",
+        Arrays.toString(new Object[]{getQualifiedName(), 
Utils.simpleName(operatorName), " added"}));
     return this;
   }
 
   @Override
   public Configuration getTaskConfiguration(final Configuration taskConf) {
-    LOG.entering("CommunicationGroupDriverImpl", "getTaskConfiguration", new 
Object[]{getQualifiedName(), confSerializer.toString(taskConf)});
+    LOG.entering("CommunicationGroupDriverImpl", "getTaskConfiguration",
+        new Object[]{getQualifiedName(), confSerializer.toString(taskConf)});
     final JavaConfigurationBuilder jcb = 
Tang.Factory.getTang().newConfigurationBuilder();
     final String taskId = taskId(taskConf);
     if (perTaskState.containsKey(taskId)) {
@@ -182,7 +187,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
       return null;
     }
     final Configuration configuration = jcb.build();
-    LOG.exiting("CommunicationGroupDriverImpl", "getTaskConfiguration", 
Arrays.toString(new Object[]{getQualifiedName(), 
confSerializer.toString(configuration)}));
+    LOG.exiting("CommunicationGroupDriverImpl", "getTaskConfiguration",
+        Arrays.toString(new Object[]{getQualifiedName(), 
confSerializer.toString(configuration)}));
     return configuration;
   }
 
@@ -192,14 +198,17 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
     if (!taskState.equals(TaskState.NOT_STARTED)) {
       LOG.finest(getQualifiedName() + taskId + " has started.");
       if (taskState.equals(TaskState.RUNNING)) {
-        LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", 
Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " is running. We 
can't get config"}));
+        LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig",
+            Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " 
is running. We can't get config"}));
         return true;
       } else {
-        LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", 
Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " has failed. 
We can get config"}));
+        LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig",
+            Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " 
has failed. We can get config"}));
         return false;
       }
     } else {
-      LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig", 
Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " has not 
started. We can get config"}));
+      LOG.exiting("CommunicationGroupDriverImpl", "cantGetConfig",
+          Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " 
has not started. We can get config"}));
       return false;
     }
   }
@@ -211,7 +220,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
 
   @Override
   public void addTask(final Configuration partialTaskConf) {
-    LOG.entering("CommunicationGroupDriverImpl", "addTask", new 
Object[]{getQualifiedName(), confSerializer.toString(partialTaskConf)});
+    LOG.entering("CommunicationGroupDriverImpl", "addTask",
+        new Object[]{getQualifiedName(), 
confSerializer.toString(partialTaskConf)});
     final String taskId = taskId(partialTaskConf);
     LOG.finest(getQualifiedName() + "AddTask(" + taskId + "). Waiting to 
acquire toBeRemovedLock");
     synchronized (toBeRemovedLock) {
@@ -236,7 +246,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
       LOG.finest(getQualifiedName() + "Released topologiesLock");
     }
     LOG.fine(getQualifiedName() + "Added " + taskId + " to topology");
-    LOG.exiting("CommunicationGroupDriverImpl", "addTask", Arrays.toString(new 
Object[]{getQualifiedName(), "Added task: ", taskId}));
+    LOG.exiting("CommunicationGroupDriverImpl", "addTask",
+        Arrays.toString(new Object[]{getQualifiedName(), "Added task: ", 
taskId}));
   }
 
   public void removeTask(final String taskId) {
@@ -261,7 +272,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
       LOG.finest(getQualifiedName() + "Released toBeRemovedLock");
     }
     LOG.fine(getQualifiedName() + "Removed " + taskId + " to topology");
-    LOG.exiting("CommunicationGroupDriverImpl", "removeTask", 
Arrays.toString(new Object[]{getQualifiedName(), "Removed task: ", taskId}));
+    LOG.exiting("CommunicationGroupDriverImpl", "removeTask",
+        Arrays.toString(new Object[]{getQualifiedName(), "Removed task: ", 
taskId}));
   }
 
   public void runTask(final String id) {
@@ -290,10 +302,12 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
       LOG.finest(getQualifiedName() + "Released yetToRunLock");
     }
     if (nonMember) {
-      LOG.exiting("CommunicationGroupDriverImpl", "runTask", 
getQualifiedName() + id + " does not belong to this communication group. 
Ignoring");
+      LOG.exiting("CommunicationGroupDriverImpl", "runTask",
+          getQualifiedName() + id + " does not belong to this communication 
group. Ignoring");
     } else {
       LOG.fine(getQualifiedName() + "Status of task " + id + " changed to 
RUNNING");
-      LOG.exiting("CommunicationGroupDriverImpl", "runTask", 
Arrays.toString(new Object[]{getQualifiedName(), "Set running complete on task 
", id}));
+      LOG.exiting("CommunicationGroupDriverImpl", "runTask",
+          Arrays.toString(new Object[]{getQualifiedName(), "Set running 
complete on task ", id}));
     }
   }
 
@@ -342,7 +356,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
       LOG.finest(getQualifiedName() + "Released configLock");
     }
     LOG.fine(getQualifiedName() + "Status of task " + id + " changed to 
FAILED");
-    LOG.exiting("CommunicationGroupDriverImpl", "failTask", 
Arrays.toString(new Object[]{getQualifiedName(), "Set failed complete on task 
", id}));
+    LOG.exiting("CommunicationGroupDriverImpl", "failTask",
+        Arrays.toString(new Object[]{getQualifiedName(), "Set failed complete 
on task ", id}));
   }
 
   private boolean cantFailTask(final String taskId) {
@@ -351,14 +366,18 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
     if (!taskState.equals(TaskState.NOT_STARTED)) {
       LOG.finest(getQualifiedName() + taskId + " has started.");
       if (!taskState.equals(TaskState.RUNNING)) {
-        LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", 
Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " is not running 
yet. Can't set failure"}));
+        LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask",
+            Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " 
is not running yet. Can't set failure"}));
         return true;
       } else {
-        LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", 
Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " is running. 
Can set failure"}));
+        LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask",
+            Arrays.toString(new Object[]{false, getQualifiedName(), taskId, " 
is running. Can set failure"}));
         return false;
       }
     } else {
-      LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask", 
Arrays.toString(new Object[]{true, getQualifiedName(), taskId, " has not 
started. We can't fail a task that hasn't started"}));
+      LOG.exiting("CommunicationGroupDriverImpl", "cantFailTask",
+          Arrays.toString(new Object[]{true, getQualifiedName(), taskId,
+              " has not started. We can't fail a task that hasn't started"}));
       return true;
     }
   }
@@ -369,8 +388,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
     final Class<? extends Name<String>> operName = indMsg.getOperName();
     final MsgKey key = new MsgKey(msg);
     if (msgQue.contains(key, indMsg)) {
-      throw new RuntimeException(getQualifiedName() + "MsgQue already contains 
" + msg.getType() + " msg for " + key + " in "
-          + Utils.simpleName(operName));
+      throw new RuntimeException(getQualifiedName() + "MsgQue already contains 
" + msg.getType() + " msg for " + key +
+          " in " + Utils.simpleName(operName));
     }
     LOG.finest(getQualifiedName() + "Adding msg to que");
     msgQue.add(key, indMsg);
@@ -382,7 +401,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
       }
       LOG.finest(getQualifiedName() + "All msgs processed and removed");
     }
-    LOG.exiting("CommunicationGroupDriverImpl", "queNProcessMsg", 
Arrays.toString(new Object[]{getQualifiedName(), "Que & Process done for: ", 
msg}));
+    LOG.exiting("CommunicationGroupDriverImpl", "queNProcessMsg",
+        Arrays.toString(new Object[]{getQualifiedName(), "Que & Process done 
for: ", msg}));
   }
 
   private boolean isMsgVersionOk(final GroupCommunicationMessage msg) {
@@ -393,7 +413,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
       final int expSrcVersion = 
topologies.get(Utils.getClass(msg.getOperatorname())).getNodeVersion(srcId);
 
       final boolean srcVersionChk = chkVersion(rcvSrcVersion, expSrcVersion, 
"Src Version Check: ");
-      LOG.exiting("CommunicationGroupDriverImpl", "isMsgVersionOk", 
Arrays.toString(new Object[]{srcVersionChk, getQualifiedName(), msg}));
+      LOG.exiting("CommunicationGroupDriverImpl", "isMsgVersionOk",
+          Arrays.toString(new Object[]{srcVersionChk, getQualifiedName(), 
msg}));
       return srcVersionChk;
     } else {
       throw new RuntimeException(getQualifiedName() + "can only deal with 
versioned msgs");
@@ -423,8 +444,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
         return;
       }
       if (initializing.get() || 
msg.getType().equals(ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology))
 {
-        LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": Waiting for 
all required(" + allTasksAdded.getInitialCount() +
-            ") nodes to run");
+        LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": Waiting for 
all required(" +
+            allTasksAdded.getInitialCount() + ") nodes to run");
         allTasksAdded.await();
         LOG.fine(getQualifiedName() + msg.getSimpleOperName() + ": All 
required(" + allTasksAdded.getInitialCount() +
             ") nodes are running");
@@ -433,7 +454,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
       queNProcessMsg(msg);
       LOG.finest(getQualifiedName() + "Released topologiesLock");
     }
-    LOG.exiting("CommunicationGroupDriverImpl", "processMsg", 
Arrays.toString(new Object[]{getQualifiedName(), "ProcessMsg done for: ", 
msg}));
+    LOG.exiting("CommunicationGroupDriverImpl", "processMsg",
+        Arrays.toString(new Object[]{getQualifiedName(), "ProcessMsg done for: 
", msg}));
   }
 
   private String taskId(final Configuration partialTaskConf) {
@@ -441,7 +463,8 @@ public class CommunicationGroupDriverImpl implements 
CommunicationGroupDriver {
       final Injector injector = 
Tang.Factory.getTang().newInjector(partialTaskConf);
       return 
injector.getNamedInstance(TaskConfigurationOptions.Identifier.class);
     } catch (final InjectionException e) {
-      throw new RuntimeException(getQualifiedName() + "Injection exception 
while extracting taskId from partialTaskConf", e);
+      throw new RuntimeException(getQualifiedName() +
+          "Injection exception while extracting taskId from partialTaskConf", 
e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
index 497d032..e2e1093 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
@@ -71,7 +71,8 @@ public class FlatTopology implements Topology {
   private final ConcurrentMap<String, TaskNode> nodes = new 
ConcurrentSkipListMap<>();
 
   public FlatTopology(final EStage<GroupCommunicationMessage> senderStage,
-                      final Class<? extends Name<String>> groupName, final 
Class<? extends Name<String>> operatorName,
+                      final Class<? extends Name<String>> groupName,
+                      final Class<? extends Name<String>> operatorName,
                       final String driverId, final int numberOfTasks) {
     this.senderStage = senderStage;
     this.groupName = groupName;
@@ -279,7 +280,8 @@ public class FlatTopology implements Topology {
     }
     for (final TaskNode node : toBeUpdatedNodes) {
       node.updatingTopology();
-      senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, driverId, 0, 
node.getTaskId(),
+      senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
+          ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, 
driverId, 0, node.getTaskId(),
           node.getVersion(), Utils.EMPTY_BYTE_ARR));
     }
     nodeTopologyUpdateWaitStage.onNext(toBeUpdatedNodes);
@@ -297,7 +299,8 @@ public class FlatTopology implements Topology {
     }
     final GroupChanges changes = new GroupChangesImpl(hasTopologyChanged);
     final Codec<GroupChanges> changesCodec = new GroupChangesCodec();
-    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, driverId, 0, 
dstId, getNodeVersion(dstId),
+    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
+        ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, 
driverId, 0, dstId, getNodeVersion(dstId),
         changesCodec.encode(changes)));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
index 089bfb3..3e08b07 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java
@@ -206,11 +206,13 @@ public class GroupCommDriverImpl implements 
GroupCommServiceDriver {
   @Override
   public CommunicationGroupDriver newCommunicationGroup(final Class<? extends 
Name<String>> groupName,
                                                         final int 
numberOfTasks) {
-    LOG.entering("GroupCommDriverImpl", "newCommunicationGroup", new 
Object[]{Utils.simpleName(groupName), numberOfTasks});
+    LOG.entering("GroupCommDriverImpl", "newCommunicationGroup",
+        new Object[]{Utils.simpleName(groupName), numberOfTasks});
     final BroadcastingEventHandler<RunningTask> commGroupRunningTaskHandler = 
new BroadcastingEventHandler<>();
     final BroadcastingEventHandler<FailedTask> commGroupFailedTaskHandler = 
new BroadcastingEventHandler<>();
     final BroadcastingEventHandler<FailedEvaluator> 
commGroupFailedEvaluatorHandler = new BroadcastingEventHandler<>();
-    final BroadcastingEventHandler<GroupCommunicationMessage> 
commGroupMessageHandler = new BroadcastingEventHandler<>();
+    final BroadcastingEventHandler<GroupCommunicationMessage> 
commGroupMessageHandler =
+        new BroadcastingEventHandler<>();
     final CommunicationGroupDriver commGroupDriver = new 
CommunicationGroupDriverImpl(groupName, confSerializer,
         senderStage,
         commGroupRunningTaskHandler,
@@ -222,7 +224,8 @@ public class GroupCommDriverImpl implements 
GroupCommServiceDriver {
     groupCommRunningTaskHandler.addHandler(commGroupRunningTaskHandler);
     groupCommFailedTaskHandler.addHandler(commGroupFailedTaskHandler);
     groupCommMessageHandler.addHandler(groupName, commGroupMessageHandler);
-    LOG.exiting("GroupCommDriverImpl", "newCommunicationGroup", "Created 
communication group: " + Utils.simpleName(groupName));
+    LOG.exiting("GroupCommDriverImpl", "newCommunicationGroup",
+        "Created communication group: " + Utils.simpleName(groupName));
     return commGroupDriver;
   }
 
@@ -311,7 +314,7 @@ public class GroupCommDriverImpl implements 
GroupCommServiceDriver {
   @Override
   public EStage<FailedEvaluator> getGroupCommFailedEvaluatorStage() {
     LOG.entering("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage");
-    LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage", 
"Returning GroupCommFaileEvaluatorStage");
+    LOG.exiting("GroupCommDriverImpl", "getGroupCommFailedEvaluatorStage", 
"Returning GroupCommFailedEvaluatorStage");
     return groupCommFailedEvaluatorStage;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
index 1589bd7..f1a70be 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java
@@ -56,7 +56,8 @@ public class TaskNodeImpl implements TaskNode {
   private final AtomicInteger version = new AtomicInteger(0);
 
   public TaskNodeImpl(final EStage<GroupCommunicationMessage> senderStage,
-                      final Class<? extends Name<String>> groupName, final 
Class<? extends Name<String>> operatorName,
+                      final Class<? extends Name<String>> groupName,
+                      final Class<? extends Name<String>> operatorName,
                       final String taskId, final String driverId, final 
boolean isRoot) {
     this.senderStage = senderStage;
     this.groupName = groupName;
@@ -102,7 +103,8 @@ public class TaskNodeImpl implements TaskNode {
     LOG.entering("TaskNodeImpl", "onFailedTask", getQualifiedName());
     if (!running.compareAndSet(true, false)) {
       LOG.fine(getQualifiedName() + "Trying to set failed on an already failed 
task. Something fishy!!!");
-      LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName() + "Trying 
to set failed on an already failed task. Something fishy!!!");
+      LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName() +
+          "Trying to set failed on an already failed task. Something 
fishy!!!");
       return;
     }
     taskNodeStatus.clearStateAndReleaseLocks();
@@ -129,13 +131,15 @@ public class TaskNodeImpl implements TaskNode {
     LOG.entering("TaskNodeImpl", "onRunningTask", getQualifiedName());
     if (!running.compareAndSet(false, true)) {
       LOG.fine(getQualifiedName() + "Trying to set running on an already 
running task. Something fishy!!!");
-      LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName() + 
"Trying to set running on an already running task. Something fishy!!!");
+      LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName() +
+          "Trying to set running on an already running task. Something 
fishy!!!");
       return;
     }
     final int version = this.version.get();
     LOG.finest(getQualifiedName() + "Changed status to running version-" + 
version);
     if (parent != null && parent.isRunning()) {
-      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, 
parent.getTaskId(),
+      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName,
+          ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, 
parent.getTaskId(),
           parent.getVersion(), taskId,
           version, Utils.EMPTY_BYTE_ARR);
       taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
@@ -146,7 +150,8 @@ public class TaskNodeImpl implements TaskNode {
     }
     for (final TaskNode child : children) {
       if (child.isRunning()) {
-        final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
child.getTaskId(),
+        final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName,
+            ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
child.getTaskId(),
             child.getVersion(), taskId, version,
             Utils.EMPTY_BYTE_ARR);
         taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
@@ -167,7 +172,8 @@ public class TaskNodeImpl implements TaskNode {
     if (parent != null && parent.isRunning()) {
       final int parentVersion = parent.getVersion();
       final String parentTaskId = parent.getTaskId();
-      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, 
parentTaskId,
+      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName,
+          ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, 
parentTaskId,
           parentVersion, taskId, version.get(),
           Utils.EMPTY_BYTE_ARR);
       taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
@@ -186,7 +192,8 @@ public class TaskNodeImpl implements TaskNode {
       final int parentVersion = parent.getVersion();
       final String parentTaskId = parent.getTaskId();
       taskNodeStatus.updateFailureOf(parent.getTaskId());
-      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead, 
parentTaskId,
+      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName,
+          ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead, 
parentTaskId,
           parentVersion, taskId, version.get(),
           Utils.EMPTY_BYTE_ARR);
       taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
@@ -203,7 +210,8 @@ public class TaskNodeImpl implements TaskNode {
     final TaskNode childTask = findTask(childId);
     if (childTask != null && childTask.isRunning()) {
       final int childVersion = childTask.getVersion();
-      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, childId,
+      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName,
+          ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, childId,
           childVersion, taskId, version.get(),
           Utils.EMPTY_BYTE_ARR);
       taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
@@ -223,13 +231,15 @@ public class TaskNodeImpl implements TaskNode {
     if (childTask != null) {
       final int childVersion = childTask.getVersion();
       taskNodeStatus.updateFailureOf(childId);
-      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, childId,
+      final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, 
operName,
+          ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, childId,
           childVersion, taskId, version.get(),
           Utils.EMPTY_BYTE_ARR);
       taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid());
       senderStage.onNext(gcm);
     } else {
-      throw new RuntimeException(getQualifiedName() + "Don't expect task for " 
+ childId + " to be null. Something wrong");
+      throw new RuntimeException(getQualifiedName() + "Don't expect task for " 
+ childId +
+          " to be null. Something wrong");
     }
     LOG.exiting("TaskNodeImpl", "onChildDead", getQualifiedName() + childId);
   }
@@ -327,7 +337,8 @@ public class TaskNodeImpl implements TaskNode {
   private void sendTopoSetupMsg() {
     LOG.entering("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName() + 
taskId);
     LOG.fine(getQualifiedName() + "is an active participant in the topology");
-    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, driverId, 0, 
taskId,
+    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
+        ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, 
driverId, 0, taskId,
         version.get(), Utils.EMPTY_BYTE_ARR));
     taskNodeStatus.onTopologySetupMessageSent();
     final boolean sentAlready = !topoSetupSent.compareAndSet(false, true);
@@ -379,28 +390,35 @@ public class TaskNodeImpl implements TaskNode {
   private boolean parentActive() {
     LOG.entering("TaskNodeImpl", "parentActive", getQualifiedName());
     if (isRoot) {
-      LOG.exiting("TaskNodeImpl", "parentActive", Arrays.toString(new 
Object[]{true, getQualifiedName(), "I am root. Will never have parent. So 
signalling active"}));
+      LOG.exiting("TaskNodeImpl", "parentActive",
+          Arrays.toString(new Object[]{true, getQualifiedName(),
+              "I am root. Will never have parent. So signalling active"}));
       return true;
     }
     if (isNeighborActive(parent.getTaskId())) {
-      LOG.exiting("TaskNodeImpl", "parentActive", Arrays.toString(new 
Object[]{true, getQualifiedName(), parent, " is an active neghbor"}));
+      LOG.exiting("TaskNodeImpl", "parentActive",
+          Arrays.toString(new Object[]{true, getQualifiedName(), parent, " is 
an active neighbor"}));
       return true;
     }
-    LOG.exiting("TaskNodeImpl", "parentActive", getQualifiedName() + "Neither 
root Nor is " + parent + " an active neghbor");
+    LOG.exiting("TaskNodeImpl", "parentActive",
+        getQualifiedName() + "Neither root Nor is " + parent + " an active 
neighbor");
     return false;
   }
 
   private boolean activeNeighborOfParent() {
     LOG.entering("TaskNodeImpl", "activeNeighborOfParent", getQualifiedName());
     if (isRoot) {
-      LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", 
Arrays.toString(new Object[]{true, getQualifiedName(), "I am root. Will never 
have parent. So signalling active"}));
+      LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", 
Arrays.toString(new Object[]{true, getQualifiedName(),
+          "I am root. Will never have parent. So signalling active"}));
       return true;
     }
     if (parent.isNeighborActive(taskId)) {
-      LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", 
Arrays.toString(new Object[]{true, getQualifiedName(), "I am an active neighbor 
of parent ", parent}));
+      LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", 
Arrays.toString(new Object[]{true, getQualifiedName(),
+          "I am an active neighbor of parent ", parent}));
       return true;
     }
-    LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new 
Object[]{false, getQualifiedName(), "Neither is parent null Nor am I an active 
neighbor of parent ", parent}));
+    LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new 
Object[]{false, getQualifiedName(),
+        "Neither is parent null Nor am I an active neighbor of parent ", 
parent}));
     return false;
   }
 
@@ -409,11 +427,13 @@ public class TaskNodeImpl implements TaskNode {
     for (final TaskNode child : children) {
       final String childId = child.getTaskId();
       if (child.isRunning() && !isNeighborActive(childId)) {
-        LOG.exiting("TaskNodeImpl", "allChildrenActive", Arrays.toString(new 
Object[]{false, getQualifiedName(), childId, " not active yet"}));
+        LOG.exiting("TaskNodeImpl", "allChildrenActive",
+            Arrays.toString(new Object[]{false, getQualifiedName(), childId, " 
not active yet"}));
         return false;
       }
     }
-    LOG.exiting("TaskNodeImpl", "allChildrenActive", Arrays.toString(new 
Object[]{true, getQualifiedName(), "All children active"}));
+    LOG.exiting("TaskNodeImpl", "allChildrenActive",
+        Arrays.toString(new Object[]{true, getQualifiedName(), "All children 
active"}));
     return true;
   }
 
@@ -421,11 +441,13 @@ public class TaskNodeImpl implements TaskNode {
     LOG.entering("TaskNodeImpl", "activeNeighborOfAllChildren", 
getQualifiedName());
     for (final TaskNode child : children) {
       if (child.isRunning() && !child.isNeighborActive(taskId)) {
-        LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", 
Arrays.toString(new Object[]{false, getQualifiedName(), "Not an active neighbor 
of child ", child}));
+        LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren",
+            Arrays.toString(new Object[]{false, getQualifiedName(), "Not an 
active neighbor of child ", child}));
         return false;
       }
     }
-    LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", 
Arrays.toString(new Object[]{true, getQualifiedName(), "Active neighbor of all 
children"}));
+    LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren",
+        Arrays.toString(new Object[]{true, getQualifiedName(), "Active 
neighbor of all children"}));
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java
index 6503a92..12679b8 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java
@@ -117,7 +117,8 @@ public class TaskNodeStatusImpl implements TaskNodeStatus {
     LOG.entering("TaskNodeStatusImpl", "expectAckFor", new 
Object[]{getQualifiedName(), msgType, srcId});
     LOG.finest(getQualifiedName() + "Adding " + srcId + " to sources");
     statusMap.add(msgType, srcId);
-    LOG.exiting("TaskNodeStatusImpl", "expectAckFor", getQualifiedName() + 
"Sources from which ACKs for " + msgType + " are expected: " + 
statusMap.get(msgType));
+    LOG.exiting("TaskNodeStatusImpl", "expectAckFor",
+        getQualifiedName() + "Sources from which ACKs for " + msgType + " are 
expected: " + statusMap.get(msgType));
   }
 
   @Override
@@ -236,7 +237,8 @@ public class TaskNodeStatusImpl implements TaskNodeStatus {
   }
 
   private String getQualifiedName() {
-    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + 
":(" + taskId + "," + node.getVersion() + ") - ";
+    return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + 
":(" + taskId + "," +
+        node.getVersion() + ") - ";
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
index ad3e385..709f866 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java
@@ -55,8 +55,10 @@ public class TopologyUpdateWaitHandler implements 
EventHandler<List<TaskNode>> {
    * will continue their regular operation
    */
   public TopologyUpdateWaitHandler(final EStage<GroupCommunicationMessage> 
senderStage,
-                                   final Class<? extends Name<String>> 
groupName, final Class<? extends Name<String>> operName,
-                                   final String driverId, final int 
driverVersion, final String dstId, final int dstVersion,
+                                   final Class<? extends Name<String>> 
groupName,
+                                   final Class<? extends Name<String>> 
operName,
+                                   final String driverId, final int 
driverVersion,
+                                   final String dstId, final int dstVersion,
                                    final String qualifiedName) {
     super();
     this.senderStage = senderStage;
@@ -86,7 +88,8 @@ public class TopologyUpdateWaitHandler implements 
EventHandler<List<TaskNode>> {
     LOG.finest(qualifiedName + "NodeTopologyUpdateWaitStage All to be updated 
nodes " + "have received TopologySetup");
     LOG.fine(qualifiedName + "All affected parts of the topology are in 
TopologyUpdate phase. Will send a note to ("
         + dstId + "," + dstVersion + ")");
-    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated, driverId, 
driverVersion, dstId,
+    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
+        ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated, 
driverId, driverVersion, dstId,
         dstVersion, Utils.EMPTY_BYTE_ARR));
     LOG.exiting("TopologyUpdateWaitHandler", "onNext", qualifiedName);
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
index f0366ec..b0a940d 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java
@@ -312,7 +312,8 @@ public class TreeTopology implements Topology {
     for (final TaskNode node : toBeUpdatedNodes) {
       node.updatingTopology();
       LOG.fine(getQualifiedName() + "Asking " + node + " to UpdateTopology");
-      senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, driverId, 0, 
node.getTaskId(),
+      senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
+          ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, 
driverId, 0, node.getTaskId(),
           node.getVersion(), Utils.EMPTY_BYTE_ARR));
     }
     nodeTopologyUpdateWaitStage.onNext(toBeUpdatedNodes);
@@ -334,7 +335,8 @@ public class TreeTopology implements Topology {
     final GroupChanges changes = new GroupChangesImpl(hasTopologyChanged);
     final Codec<GroupChanges> changesCodec = new GroupChangesCodec();
     LOG.fine(getQualifiedName() + "TopologyChanges: " + changes);
-    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, driverId, 0, 
dstId, getNodeVersion(dstId),
+    senderStage.onNext(Utils.bldVersionedGCM(groupName, operName,
+        ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, 
driverId, 0, dstId, getNodeVersion(dstId),
         changesCodec.encode(changes)));
     LOG.exiting("TreeTopology", "onTopologyChanges", getQualifiedName() + msg);
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java
index 40abe66..a5a0930 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.java
@@ -38,8 +38,10 @@ public class CommGroupNetworkHandlerImpl implements
 
   private static final Logger LOG = 
Logger.getLogger(CommGroupNetworkHandlerImpl.class.getName());
 
-  private final Map<Class<? extends Name<String>>, 
EventHandler<GroupCommunicationMessage>> operHandlers = new 
ConcurrentHashMap<>();
-  private final Map<Class<? extends Name<String>>, 
BlockingQueue<GroupCommunicationMessage>> topologyNotifications = new 
ConcurrentHashMap<>();
+  private final Map<Class<? extends Name<String>>, 
EventHandler<GroupCommunicationMessage>> operHandlers =
+      new ConcurrentHashMap<>();
+  private final Map<Class<? extends Name<String>>, 
BlockingQueue<GroupCommunicationMessage>> topologyNotifications =
+      new ConcurrentHashMap<>();
 
   @Inject
   public CommGroupNetworkHandlerImpl() {
@@ -50,7 +52,8 @@ public class CommGroupNetworkHandlerImpl implements
                        final EventHandler<GroupCommunicationMessage> 
operHandler) {
     LOG.entering("CommGroupNetworkHandlerImpl", "register", new 
Object[]{Utils.simpleName(operName), operHandler});
     operHandlers.put(operName, operHandler);
-    LOG.exiting("CommGroupNetworkHandlerImpl", "register", Arrays.toString(new 
Object[]{Utils.simpleName(operName), operHandler}));
+    LOG.exiting("CommGroupNetworkHandlerImpl", "register",
+        Arrays.toString(new Object[]{Utils.simpleName(operName), 
operHandler}));
   }
 
   @Override
@@ -65,7 +68,8 @@ public class CommGroupNetworkHandlerImpl implements
   public void onNext(final GroupCommunicationMessage msg) {
     LOG.entering("CommGroupNetworkHandlerImpl", "onNext", msg);
     final Class<? extends Name<String>> operName = 
Utils.getClass(msg.getOperatorname());
-    if (msg.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated || 
msg.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges) {
+    if (msg.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated ||
+        msg.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges) {
       topologyNotifications.get(operName).add(msg);
     } else {
       operHandlers.get(operName).onNext(msg);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
index 1eacb03..f9143d9 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/CommunicationGroupClientImpl.java
@@ -205,7 +205,8 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
       final Class<? extends Name<String>> operName = op.getOperName();
       LOG.finest("Sending TopologyChanges msg to driver");
       try {
-        sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, taskId, 
op.getVersion(), driverId,
+        sender.send(Utils.bldVersionedGCM(groupName, operName,
+            ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, 
taskId, op.getVersion(), driverId,
             0, Utils.EMPTY_BYTE_ARR));
       } catch (final NetworkException e) {
         throw new RuntimeException("NetworkException while sending 
GetTopologyChanges", e);
@@ -247,7 +248,8 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
     for (final GroupCommOperator op : operators.values()) {
       final Class<? extends Name<String>> operName = op.getOperName();
       try {
-        sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, taskId, 
op.getVersion(), driverId,
+        sender.send(Utils.bldVersionedGCM(groupName, operName,
+            ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, 
taskId, op.getVersion(), driverId,
             0, Utils.EMPTY_BYTE_ARR));
       } catch (final NetworkException e) {
         throw new RuntimeException("NetworkException while sending 
UpdateTopology", e);
@@ -277,7 +279,8 @@ public class CommunicationGroupClientImpl implements 
CommunicationGroupServiceCl
       } else {
         retVal = true;
       }
-      LOG.exiting("CommunicationGroupClientImpl", "isMsgVersionOk", 
Arrays.toString(new Object[]{retVal, getQualifiedName(), msg}));
+      LOG.exiting("CommunicationGroupClientImpl", "isMsgVersionOk",
+          Arrays.toString(new Object[]{retVal, getQualifiedName(), msg}));
       return retVal;
     } else {
       throw new RuntimeException(getQualifiedName() + "can only deal with 
versioned msgs");

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
index 30165f9..4e635c9 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommClientImpl.java
@@ -45,7 +45,8 @@ import java.util.logging.Logger;
 public class GroupCommClientImpl implements GroupCommClient {
   private static final Logger LOG = 
Logger.getLogger(GroupCommClientImpl.class.getName());
 
-  private final Map<Class<? extends Name<String>>, 
CommunicationGroupServiceClient> communicationGroups = new HashMap<>();
+  private final Map<Class<? extends Name<String>>, 
CommunicationGroupServiceClient> communicationGroups =
+      new HashMap<>();
 
   @Inject
   public GroupCommClientImpl(

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java
index 867e548..e018832 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/GroupCommNetworkHandlerImpl.java
@@ -35,7 +35,8 @@ public class GroupCommNetworkHandlerImpl implements 
GroupCommNetworkHandler {
 
   private static final Logger LOG = 
Logger.getLogger(GroupCommNetworkHandlerImpl.class.getName());
 
-  private final Map<Class<? extends Name<String>>, 
EventHandler<GroupCommunicationMessage>> commGroupHandlers = new 
ConcurrentHashMap<>();
+  private final Map<Class<? extends Name<String>>, 
EventHandler<GroupCommunicationMessage>> commGroupHandlers =
+      new ConcurrentHashMap<>();
 
   @Inject
   public GroupCommNetworkHandlerImpl() {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
index a5c6baa..7418d33 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java
@@ -180,7 +180,8 @@ public class OperatorTopologyImpl implements 
OperatorTopology {
       } else {
         retVal = true;
       }
-      LOG.exiting("OperatorTopologyImpl", "isMsgVersionOk", 
Arrays.toString(new Object[]{retVal, getQualifiedName(), msg}));
+      LOG.exiting("OperatorTopologyImpl", "isMsgVersionOk",
+          Arrays.toString(new Object[]{retVal, getQualifiedName(), msg}));
       return retVal;
     } else {
       throw new RuntimeException(getQualifiedName() + "can only deal with 
versioned msgs");
@@ -195,21 +196,25 @@ public class OperatorTopologyImpl implements 
OperatorTopology {
   }
 
   @Override
-  public void sendToParent(final byte[] data, final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws 
ParentDeadException {
+  public void sendToParent(final byte[] data, final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType)
+      throws ParentDeadException {
     LOG.entering("OperatorTopologyImpl", "sendToParent", new 
Object[]{getQualifiedName(), data, msgType});
     refreshEffectiveTopology();
     assert (effectiveTopology != null);
     effectiveTopology.sendToParent(data, msgType);
-    LOG.exiting("OperatorTopologyImpl", "sendToParent", Arrays.toString(new 
Object[]{getQualifiedName(), data, msgType}));
+    LOG.exiting("OperatorTopologyImpl", "sendToParent",
+        Arrays.toString(new Object[]{getQualifiedName(), data, msgType}));
   }
 
   @Override
-  public void sendToChildren(final byte[] data, final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) throws 
ParentDeadException {
+  public void sendToChildren(final byte[] data, final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType)
+      throws ParentDeadException {
     LOG.entering("OperatorTopologyImpl", "sendToChildren", new 
Object[]{getQualifiedName(), data, msgType});
     refreshEffectiveTopology();
     assert (effectiveTopology != null);
     effectiveTopology.sendToChildren(data, msgType);
-    LOG.exiting("OperatorTopologyImpl", "sendToChildren", Arrays.toString(new 
Object[]{getQualifiedName(), data, msgType}));
+    LOG.exiting("OperatorTopologyImpl", "sendToChildren",
+        Arrays.toString(new Object[]{getQualifiedName(), data, msgType}));
   }
 
   @Override
@@ -223,7 +228,8 @@ public class OperatorTopologyImpl implements 
OperatorTopology {
   }
 
   @Override
-  public <T> T recvFromChildren(final Reduce.ReduceFunction<T> redFunc, final 
Codec<T> dataCodec) throws ParentDeadException {
+  public <T> T recvFromChildren(final Reduce.ReduceFunction<T> redFunc, final 
Codec<T> dataCodec)
+      throws ParentDeadException {
     LOG.entering("OperatorTopologyImpl", "recvFromChildren", 
getQualifiedName());
     refreshEffectiveTopology();
     assert (effectiveTopology != null);
@@ -288,9 +294,12 @@ public class OperatorTopologyImpl implements 
OperatorTopology {
         baseTopology.setChanges(true);
 
         LOG.finest(getQualifiedName() + "Waiting for ctrl msgs");
-        for (GroupCommunicationMessage msg = deltas.take(); msg.getType() != 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup; msg = 
deltas.take()) {
+        for (GroupCommunicationMessage msg = deltas.take();
+             msg.getType() != 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup;
+             msg = deltas.take()) {
           LOG.finest(getQualifiedName() + "Got " + msg.getType() + " msg from 
" + msg.getSrcid());
-          if (effectiveTopology == null && msg.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) {
+          if (effectiveTopology == null &&
+              msg.getType() == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) {
             /**
              * If effectiveTopology!=null, this method is being called from 
the BaseTopologyUpdateStage
              * And exception thrown will be caught by uncaughtExceptionHandler 
leading to System.exit
@@ -324,23 +333,28 @@ public class OperatorTopologyImpl implements 
OperatorTopology {
         final int srcVersion = msg.getSrcVersion();
         switch (msg.getType()) {
         case UpdateTopology:
-          sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, selfId, 
this.version, driverId,
+          sender.send(Utils.bldVersionedGCM(groupName, operName,
+              ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, 
selfId, this.version, driverId,
                 srcVersion, Utils.EMPTY_BYTE_ARR));
           break;
         case ParentAdd:
-          sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdded, selfId, 
this.version, srcId,
+          sender.send(Utils.bldVersionedGCM(groupName, operName,
+              ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdded, 
selfId, this.version, srcId,
                 srcVersion, Utils.EMPTY_BYTE_ARR), driverId);
           break;
         case ParentDead:
-          sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentRemoved, selfId, 
this.version, srcId,
+          sender.send(Utils.bldVersionedGCM(groupName, operName,
+              ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentRemoved, 
selfId, this.version, srcId,
                 srcVersion, Utils.EMPTY_BYTE_ARR), driverId);
           break;
         case ChildAdd:
-          sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdded, selfId, 
this.version, srcId,
+          sender.send(Utils.bldVersionedGCM(groupName, operName,
+              ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdded, 
selfId, this.version, srcId,
                 srcVersion, Utils.EMPTY_BYTE_ARR), driverId);
           break;
         case ChildDead:
-          sender.send(Utils.bldVersionedGCM(groupName, operName, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildRemoved, selfId, 
this.version, srcId,
+          sender.send(Utils.bldVersionedGCM(groupName, operName,
+              ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildRemoved, 
selfId, this.version, srcId,
                 srcVersion, Utils.EMPTY_BYTE_ARR), driverId);
           break;
         default:
@@ -379,7 +393,8 @@ public class OperatorTopologyImpl implements 
OperatorTopology {
     for (final GroupCommunicationMessage msg : deletionDeltasForUpdate) {
       final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType = 
msg.getType();
       if (msgType == 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead) {
-        throw new ParentDeadException(getQualifiedName() + "Parent dead. 
Current behavior is for the child to die too.");
+        throw new ParentDeadException(getQualifiedName() +
+            "Parent dead. Current behavior is for the child to die too.");
       }
     }
     LOG.exiting("OperatorTopologyImpl", "copyDeletionDeltas", 
Arrays.toString(new Object[]{getQualifiedName(),

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
index 90d3b7c..483ac1b 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
@@ -89,7 +89,8 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
 
   @Override
   public String toString() {
-    return "OperatorTopologyStruct - " + Utils.simpleName(groupName) + ":" + 
Utils.simpleName(operName) + "(" + selfId + "," + version + ")";
+    return "OperatorTopologyStruct - " + Utils.simpleName(groupName) + ":" + 
Utils.simpleName(operName) +
+        "(" + selfId + "," + version + ")";
   }
 
   @Override
@@ -130,7 +131,8 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
   @Override
   public boolean hasChanges() {
     LOG.entering("OperatorTopologyStructImpl", "hasChanges", 
getQualifiedName());
-    LOG.exiting("OperatorTopologyStructImpl", "hasChanges", 
Arrays.toString(new Object[]{this.changes, getQualifiedName()}));
+    LOG.exiting("OperatorTopologyStructImpl", "hasChanges",
+        Arrays.toString(new Object[]{this.changes, getQualifiedName()}));
     return this.changes;
   }
 
@@ -166,11 +168,14 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
     } else {
       retVal = findChild(srcId);
     }
-    LOG.exiting("OperatorTopologyStructImpl", "findNode", Arrays.toString(new 
Object[]{retVal, getQualifiedName(), srcId}));
+    LOG.exiting("OperatorTopologyStructImpl", "findNode",
+        Arrays.toString(new Object[]{retVal, getQualifiedName(), srcId}));
     return retVal;
   }
 
-  private void sendToNode(final byte[] data, final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, final NodeStruct 
node) {
+  private void sendToNode(final byte[] data,
+                          final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType,
+                          final NodeStruct node) {
     LOG.entering("OperatorTopologyStructImpl", "sendToNode", new 
Object[]{getQualifiedName(), data, msgType, node});
     final String nodeId = node.getId();
     try {
@@ -190,7 +195,8 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
         }
       }
 
-      sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, selfId, 
version, nodeId, node.getVersion(), data));
+      sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, selfId, 
version, nodeId, node.getVersion(),
+          data));
 
       if (data.length > SMALL_MSG_LENGTH) {
         LOG.finest(getQualifiedName() + "Msg too big. Will wait for ACK before 
queing up one more msg");
@@ -230,8 +236,8 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
         LOG.fine(msg);
       }
     }
-    LOG.exiting("OperatorTopologyStructImpl", "receiveFromNode", 
Arrays.toString(new Object[]{retVal, getQualifiedName(),
-        node, remove}));
+    LOG.exiting("OperatorTopologyStructImpl", "receiveFromNode",
+        Arrays.toString(new Object[]{retVal, getQualifiedName(), node, 
remove}));
     return retVal;
   }
 
@@ -320,8 +326,8 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
       childrenToRcvFrom.remove(child.getId());
     }
     final T retVal = retLst.isEmpty() ? null : retLst.get(0);
-    LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren", 
Arrays.toString(new Object[]{retVal, getQualifiedName(),
-        redFunc, dataCodec}));
+    LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren",
+        Arrays.toString(new Object[]{retVal, getQualifiedName(), redFunc, 
dataCodec}));
     return retVal;
   }
 
@@ -358,11 +364,10 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
     LOG.entering("OperatorTopologyStructImpl", "addedToDeadMsgs", new 
Object[]{getQualifiedName(), node, msgSrcId,
         msgSrcVersion});
     if (node == null) {
-      LOG.warning(getQualifiedName() + "Got dead msg when no node existed. OOS 
Queing up for add to handle");
+      LOG.warning(getQualifiedName() + "Got dead msg when no node existed. OOS 
Queuing up for add to handle");
       addToDeadMsgs(msgSrcId, msgSrcVersion);
-      LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", 
Arrays.toString(new Object[]{true, getQualifiedName(),
-          node, msgSrcId,
-          msgSrcVersion}));
+      LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs",
+          Arrays.toString(new Object[]{true, getQualifiedName(), node, 
msgSrcId, msgSrcVersion}));
       return true;
     }
     final int nodeVersion = node.getVersion();
@@ -370,14 +375,12 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
       LOG.warning(getQualifiedName() + "Got an OOS dead msg. " + "Has HIGHER 
ver-" + msgSrcVersion + " than node ver-"
           + nodeVersion + ". Queing up for add to handle");
       addToDeadMsgs(msgSrcId, msgSrcVersion);
-      LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", 
Arrays.toString(new Object[]{true, getQualifiedName(),
-          node, msgSrcId,
-          msgSrcVersion}));
+      LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs",
+          Arrays.toString(new Object[]{true, getQualifiedName(), node, 
msgSrcId, msgSrcVersion}));
       return true;
     }
-    LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", 
Arrays.toString(new Object[]{false, getQualifiedName(),
-        node, msgSrcId,
-        msgSrcVersion}));
+    LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs",
+        Arrays.toString(new Object[]{false, getQualifiedName(), node, 
msgSrcId, msgSrcVersion}));
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java
index d484170..902ccf7 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java
@@ -91,7 +91,8 @@ public class ConcurrentCountingMap<K, V> {
   }
 
   public static void main(final String[] args) {
-    final 
ConcurrentCountingMap<ReefNetworkGroupCommProtos.GroupCommMessage.Type, String> 
strMap = new ConcurrentCountingMap<>();
+    final 
ConcurrentCountingMap<ReefNetworkGroupCommProtos.GroupCommMessage.Type, String> 
strMap =
+        new ConcurrentCountingMap<>();
     LOG.log(Level.INFO, "OUT: {0}", strMap.isEmpty());
     strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, 
"ST0");
     LOG.log(Level.INFO, "OUT: {0}", strMap);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java
index 633c740..5614d0c 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java
@@ -34,7 +34,9 @@ public final class Utils {
 
   public static GroupCommunicationMessage bldVersionedGCM(final Class<? 
extends Name<String>> groupName,
                                                           final Class<? 
extends Name<String>> operName,
-                                                          final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, final String from, 
final int srcVersion,
+                                                          final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type
+                                                              msgType,
+                                                          final String from, 
final int srcVersion,
                                                           final String to, 
final int dstVersion, final byte[]... data) {
 
     return new GroupCommunicationMessage(groupName.getName(), 
operName.getName(), msgType, from, srcVersion, to,

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
index db9e740..9b91aa8 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
@@ -118,7 +118,8 @@ public final class NetworkService<T> implements Stage, 
ConnectionFactory<T> {
                         final EventHandler<Message<T>> recvHandler,
                         final EventHandler<Exception> exHandler) {
     this(factory, nsPort, nameServerAddr, nameServerPort,
-            RETRY_COUNT, RETRY_TIMEOUT, codec, tpFactory, recvHandler, 
exHandler, LocalAddressProviderFactory.getInstance());
+         RETRY_COUNT, RETRY_TIMEOUT, codec, tpFactory, recvHandler, exHandler,
+         LocalAddressProviderFactory.getInstance());
   }
 
   /**
@@ -136,8 +137,8 @@ public final class NetworkService<T> implements Stage, 
ConnectionFactory<T> {
       final TransportFactory tpFactory,
       final EventHandler<Message<T>> recvHandler,
       final EventHandler<Exception> exHandler) {
-    this(factory, nsPort, nameServerAddr, nameServerPort, retryCount, 
retryTimeout, codec, tpFactory, recvHandler, exHandler,
-        LocalAddressProviderFactory.getInstance());
+    this(factory, nsPort, nameServerAddr, nameServerPort, retryCount, 
retryTimeout, codec, tpFactory, recvHandler,
+         exHandler, LocalAddressProviderFactory.getInstance());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
index 13d9281..c79f88a 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java
@@ -34,7 +34,8 @@ public class NetworkServiceParameters {
 
   }
 
-  @NamedParameter(doc = "identifier factory for the service", short_name = 
"factory", default_class = StringIdentifierFactory.class)
+  @NamedParameter(doc = "identifier factory for the service", short_name = 
"factory",
+      default_class = StringIdentifierFactory.class)
   public static class NetworkServiceIdentifierFactory implements 
Name<IdentifierFactory> {
   }
 
@@ -46,7 +47,8 @@ public class NetworkServiceParameters {
   public static class NetworkServiceCodec implements Name<Codec<?>> {
   }
 
-  @NamedParameter(doc = "transport factory for the network service", 
short_name = "nstransportfactory", default_class = 
MessagingTransportFactory.class)
+  @NamedParameter(doc = "transport factory for the network service", 
short_name = "nstransportfactory",
+      default_class = MessagingTransportFactory.class)
   public static class NetworkServiceTransportFactory implements 
Name<TransportFactory> {
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
index c391b67..ecfba79 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java
@@ -62,7 +62,8 @@ public final class NameClient implements NameResolver {
                     final int retryCount,
                     final int retryTimeout,
                     final Cache<Identifier, InetSocketAddress> cache) {
-    this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, 
cache, LocalAddressProviderFactory.getInstance());
+    this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, 
cache,
+         LocalAddressProviderFactory.getInstance());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
index e27ca74..0bcd71b 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java
@@ -98,7 +98,8 @@ public class NameLookupClient implements Stage, NamingLookup {
                           final int retryCount,
                           final int retryTimeout,
                           final Cache<Identifier, InetSocketAddress> cache) {
-    this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, 
cache, LocalAddressProviderFactory.getInstance());
+    this(serverAddr, serverPort, 10000, factory, retryCount, retryTimeout, 
cache,
+         LocalAddressProviderFactory.getInstance());
   }
 
   @Deprecated

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
index b23481d..6270722 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
@@ -72,7 +72,8 @@ public class NameRegistryClient implements Stage, 
NamingRegistry {
    * @param factory    an identifier factory
    */
   public NameRegistryClient(
-      final String serverAddr, final int serverPort, final IdentifierFactory 
factory, final LocalAddressProvider localAddressProvider) {
+      final String serverAddr, final int serverPort, final IdentifierFactory 
factory,
+      final LocalAddressProvider localAddressProvider) {
     this(serverAddr, serverPort, 10000, factory, localAddressProvider);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
index d990aca..8fc7cae 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
@@ -83,7 +83,8 @@ public final class NameServerImpl implements NameServer {
 
     injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, 
localAddressProvider.getLocalAddress());
     injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
-    
injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, new 
SyncStage<>(new NamingServerHandler(handler, codec)));
+    injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class,
+        new SyncStage<>(new NamingServerHandler(handler, codec)));
 
     try {
       this.transport = injector.getInstance(NettyMessagingTransport.class);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java
index 7fe688a..2fa0cc2 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NamingCodecFactory.java
@@ -56,7 +56,8 @@ final class NamingCodecFactory {
     Map<Class<? extends NamingMessage>, Codec<? extends NamingMessage>> 
clazzToCodecMap
         = new HashMap<Class<? extends NamingMessage>, Codec<? extends 
NamingMessage>>();
     clazzToCodecMap.put(NamingRegisterRequest.class, new 
NamingRegisterRequestCodec(factory));
-    clazzToCodecMap.put(NamingRegisterResponse.class, new 
NamingRegisterResponseCodec(new NamingRegisterRequestCodec(factory)));
+    clazzToCodecMap.put(NamingRegisterResponse.class,
+        new NamingRegisterResponseCodec(new 
NamingRegisterRequestCodec(factory)));
     clazzToCodecMap.put(NamingUnregisterRequest.class, new 
NamingUnregisterRequestCodec(factory));
     Codec<NamingMessage> codec = new 
MultiCodec<NamingMessage>(clazzToCodecMap);
     return codec;
@@ -74,7 +75,8 @@ final class NamingCodecFactory {
     clazzToCodecMap.put(NamingLookupRequest.class, new 
NamingLookupRequestCodec(factory));
     clazzToCodecMap.put(NamingLookupResponse.class, new 
NamingLookupResponseCodec(factory));
     clazzToCodecMap.put(NamingRegisterRequest.class, new 
NamingRegisterRequestCodec(factory));
-    clazzToCodecMap.put(NamingRegisterResponse.class, new 
NamingRegisterResponseCodec(new NamingRegisterRequestCodec(factory)));
+    clazzToCodecMap.put(NamingRegisterResponse.class,
+        new NamingRegisterResponseCodec(new 
NamingRegisterRequestCodec(factory)));
     clazzToCodecMap.put(NamingUnregisterRequest.class, new 
NamingUnregisterRequestCodec(factory));
     Codec<NamingMessage> codec = new 
MultiCodec<NamingMessage>(clazzToCodecMap);
     return codec;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java
index ee1c848..18a979f 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/serialization/NamingRegisterRequestCodec.java
@@ -66,7 +66,8 @@ public class NamingRegisterRequestCodec implements 
Codec<NamingRegisterRequest>
    */
   @Override
   public NamingRegisterRequest decode(byte[] buf) {
-    final AvroNamingRegisterRequest avroNamingRegisterRequest = 
AvroUtils.fromBytes(buf, AvroNamingRegisterRequest.class);
+    final AvroNamingRegisterRequest avroNamingRegisterRequest =
+        AvroUtils.fromBytes(buf, AvroNamingRegisterRequest.class);
     return new NamingRegisterRequest(
         new 
NameAssignmentTuple(factory.getNewInstance(avroNamingRegisterRequest.getId().toString()),
             new 
InetSocketAddress(avroNamingRegisterRequest.getHost().toString(), 
avroNamingRegisterRequest.getPort()))

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashProbability.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashProbability.java
 
b/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashProbability.java
index 7ae940d..bb56109 100644
--- 
a/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashProbability.java
+++ 
b/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashProbability.java
@@ -24,7 +24,8 @@ import org.apache.reef.tang.annotations.NamedParameter;
 /**
  * The probability with which a crash will occur.
  */
-@NamedParameter(doc = "the probability with which a crash will occur.", 
default_value = "" + CrashProbability.DEFAULT_VALUE)
+@NamedParameter(doc = "the probability with which a crash will occur.",
+    default_value = "" + CrashProbability.DEFAULT_VALUE)
 public final class CrashProbability implements Name<Double> {
   public static final double DEFAULT_VALUE = 0.1;
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashTimeout.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashTimeout.java
 
b/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashTimeout.java
index c70ae34..1602db0 100644
--- 
a/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashTimeout.java
+++ 
b/lang/java/reef-poison/src/main/java/org/apache/reef/poison/params/CrashTimeout.java
@@ -21,7 +21,8 @@ package org.apache.reef.poison.params;
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
 
-@NamedParameter(doc = "The time window (in seconds) after ContextStart in 
which the crash will occur", default_value = "" + CrashTimeout.DEFAULT_VALUE)
+@NamedParameter(doc = "The time window (in seconds) after ContextStart in 
which the crash will occur",
+    default_value = "" + CrashTimeout.DEFAULT_VALUE)
 public final class CrashTimeout implements Name<Integer> {
   public static final int DEFAULT_VALUE = 10;
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/HDICLI.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/HDICLI.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/HDICLI.java
index 8991fa1..651c4ca 100644
--- 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/HDICLI.java
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/HDICLI.java
@@ -53,10 +53,14 @@ public final class HDICLI {
     this.hdInsightInstance = hdInsightInstance;
     this.logFetcher = logFetcher;
     final OptionGroup commands = new OptionGroup()
-        
.addOption(OptionBuilder.withArgName(KILL).hasArg().withDescription("Kills the 
given application.").create(KILL))
-        
.addOption(OptionBuilder.withArgName(LOGS).hasArg().withDescription("Fetches 
the logs for the given application.").create(LOGS))
-        
.addOption(OptionBuilder.withArgName(STATUS).hasArg().withDescription("Fetches 
the status for the given application.").create(STATUS))
-        .addOption(OptionBuilder.withArgName(LIST).withDescription("Lists the 
application on the cluster.").create(LIST));
+        .addOption(OptionBuilder.withArgName(KILL).hasArg()
+            .withDescription("Kills the given application.").create(KILL))
+        .addOption(OptionBuilder.withArgName(LOGS).hasArg()
+            .withDescription("Fetches the logs for the given 
application.").create(LOGS))
+        .addOption(OptionBuilder.withArgName(STATUS).hasArg()
+            .withDescription("Fetches the status for the given 
application.").create(STATUS))
+        .addOption(OptionBuilder.withArgName(LIST)
+            .withDescription("Lists the application on the 
cluster.").create(LIST));
     this.options = new Options().addOptionGroup(commands);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFetcher.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFetcher.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFetcher.java
index c269287..e8c277f 100644
--- 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFetcher.java
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFetcher.java
@@ -65,7 +65,8 @@ final class LogFetcher {
                                                  final String accountKey,
                                                  final String containerName)
       throws URISyntaxException, InvalidKeyException, StorageException {
-    final CloudStorageAccount cloudStorageAccount = 
CloudStorageAccount.parse(getStorageConnectionString(accountName, accountKey));
+    final CloudStorageAccount cloudStorageAccount =
+        CloudStorageAccount.parse(getStorageConnectionString(accountName, 
accountKey));
     final CloudBlobClient blobClient = 
cloudStorageAccount.createCloudBlobClient();
     return blobClient.getContainerReference(containerName);
   }
@@ -98,7 +99,8 @@ final class LogFetcher {
     }
   }
 
-  private FileStatus[] downloadLogs(final String applicationId) throws 
StorageException, IOException, URISyntaxException {
+  private FileStatus[] downloadLogs(final String applicationId)
+      throws StorageException, IOException, URISyntaxException {
     final File localFolder = downloadToTempFolder(applicationId);
     final Path localFolderPath = new Path(localFolder.getAbsolutePath());
     return this.fileSystem.listStatus(localFolderPath);
@@ -113,7 +115,8 @@ final class LogFetcher {
    * @throws StorageException
    * @throws IOException
    */
-  private File downloadToTempFolder(final String applicationId) throws 
URISyntaxException, StorageException, IOException {
+  private File downloadToTempFolder(final String applicationId)
+      throws URISyntaxException, StorageException, IOException {
     final File outputFolder = Files.createTempDirectory("reeflogs-" + 
applicationId).toFile();
     outputFolder.mkdirs();
     final CloudBlobDirectory logFolder = 
this.container.getDirectoryReference(LOG_FOLDER_PREFIX + applicationId + "/");

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFileEntry.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFileEntry.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFileEntry.java
index 311ef94..14839a2 100644
--- 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFileEntry.java
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/cli/LogFileEntry.java
@@ -97,7 +97,8 @@ final class LogFileEntry {
    * @param numberOfBytes
    * @throws IOException
    */
-  private void write(final DataInputStream stream, final Writer outputWriter, 
final int numberOfBytes) throws IOException {
+  private void write(final DataInputStream stream, final Writer outputWriter, 
final int numberOfBytes)
+      throws IOException {
     final byte[] buf = new byte[65535];
     int lenRemaining = numberOfBytes;
     while (lenRemaining > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
index a05b93c..155a6cf 100644
--- 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
@@ -113,7 +113,8 @@ public final class HDInsightJobSubmissionHandler implements 
JobSubmissionHandler
                   .setCommand(command));
 
       this.hdInsightInstance.submitApplication(applicationSubmission);
-      LOG.log(Level.INFO, "Submitted application to HDInsight. The application 
id is: {0}", applicationID.getApplicationId());
+      LOG.log(Level.INFO, "Submitted application to HDInsight. The application 
id is: {0}",
+          applicationID.getApplicationId());
 
     } catch (final IOException ex) {
       LOG.log(Level.SEVERE, "Error submitting HDInsight request", ex);


Reply via email to