http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/SlaveTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/SlaveTask.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/SlaveTask.java
new file mode 100644
index 0000000..dde0bc3
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/SlaveTask.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.broadcast;
+
+import 
org.apache.reef.examples.group.bgd.operatornames.ControlMessageBroadcaster;
+import org.apache.reef.examples.group.bgd.parameters.AllCommunicationGroup;
+import org.apache.reef.examples.group.broadcast.parameters.ModelBroadcaster;
+import 
org.apache.reef.examples.group.broadcast.parameters.ModelReceiveAckReducer;
+import org.apache.reef.examples.group.utils.math.Vector;
+import org.apache.reef.io.network.group.api.operators.Broadcast;
+import org.apache.reef.io.network.group.api.operators.Reduce;
+import org.apache.reef.io.network.group.api.task.CommunicationGroupClient;
+import org.apache.reef.io.network.group.api.task.GroupCommClient;
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ *
+ */
+public class SlaveTask implements Task {
+  private final CommunicationGroupClient communicationGroupClient;
+  private final Broadcast.Receiver<ControlMessages> controlMessageBroadcaster;
+  private final Broadcast.Receiver<Vector> modelBroadcaster;
+  private final Reduce.Sender<Boolean> modelReceiveAckReducer;
+
+  @Inject
+  public SlaveTask(
+      final GroupCommClient groupCommClient) {
+    this.communicationGroupClient = 
groupCommClient.getCommunicationGroup(AllCommunicationGroup.class);
+    this.controlMessageBroadcaster = 
communicationGroupClient.getBroadcastReceiver(ControlMessageBroadcaster.class);
+    this.modelBroadcaster = 
communicationGroupClient.getBroadcastReceiver(ModelBroadcaster.class);
+    this.modelReceiveAckReducer = 
communicationGroupClient.getReduceSender(ModelReceiveAckReducer.class);
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) throws Exception {
+    boolean stop = false;
+    while (!stop) {
+      final ControlMessages controlMessage = 
controlMessageBroadcaster.receive();
+      switch (controlMessage) {
+        case Stop:
+          stop = true;
+          break;
+
+        case ReceiveModel:
+          modelBroadcaster.receive();
+          if (Math.random() < 0.1) {
+            throw new RuntimeException("Simulated Failure");
+          }
+          modelReceiveAckReducer.send(true);
+          break;
+
+        default:
+          break;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/AllCommunicationGroup.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/AllCommunicationGroup.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/AllCommunicationGroup.java
new file mode 100644
index 0000000..9e8781e
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/AllCommunicationGroup.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.broadcast.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ *
+ */
+@NamedParameter()
+public final class AllCommunicationGroup implements Name<String> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ControlMessageBroadcaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ControlMessageBroadcaster.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ControlMessageBroadcaster.java
new file mode 100644
index 0000000..4e7607a
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ControlMessageBroadcaster.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.broadcast.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter()
+public final class ControlMessageBroadcaster implements Name<String> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/Dimensions.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/Dimensions.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/Dimensions.java
new file mode 100644
index 0000000..e4814c6
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/Dimensions.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.broadcast.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ *
+ */
+@NamedParameter(doc = "Model dimensions", short_name = "dim")
+public class Dimensions implements Name<Integer> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/FailureProbability.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/FailureProbability.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/FailureProbability.java
new file mode 100644
index 0000000..911ad25
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/FailureProbability.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.broadcast.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ *
+ */
+@NamedParameter(doc = "Prob(failure)", default_value = "0.1")
+public class FailureProbability implements Name<Double> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelBroadcaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelBroadcaster.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelBroadcaster.java
new file mode 100644
index 0000000..0c673bf
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelBroadcaster.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.broadcast.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter()
+public final class ModelBroadcaster implements Name<String> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelReceiveAckReducer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelReceiveAckReducer.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelReceiveAckReducer.java
new file mode 100644
index 0000000..635eeb7
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/ModelReceiveAckReducer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.broadcast.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter()
+public final class ModelReceiveAckReducer implements Name<String> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/NumberOfReceivers.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/NumberOfReceivers.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/NumberOfReceivers.java
new file mode 100644
index 0000000..6c7983e
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/broadcast/parameters/NumberOfReceivers.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.broadcast.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ *
+ */
+@NamedParameter(doc = "The number of receivers for the operators", short_name 
= "receivers")
+public class NumberOfReceivers implements Name<Integer> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractImmutableVector.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractImmutableVector.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractImmutableVector.java
new file mode 100644
index 0000000..9fa8498
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractImmutableVector.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.utils.math;
+
+
+import org.apache.reef.io.Tuple;
+
+import java.util.Formatter;
+import java.util.Locale;
+
+/**
+ * Base class for implementing ImmutableVector
+ */
+abstract class AbstractImmutableVector implements ImmutableVector {
+
+  @Override
+  public abstract double get(int i);
+
+  @Override
+  public abstract int size();
+
+  @Override
+  public double dot(final Vector that) {
+    assert (this.size() == that.size());
+
+    double result = 0.0;
+    for (int index = 0; index < this.size(); ++index) {
+      result += this.get(index) * that.get(index);
+    }
+    return result;
+  }
+
+
+  @Override
+  public double sum() {
+    double result = 0.0;
+    for (int i = 0; i < this.size(); ++i) {
+      result += this.get(i);
+    }
+    return result;
+  }
+
+  @Override
+  public double norm2() {
+    return Math.sqrt(dot((Vector) this));
+  }
+
+  @Override
+  public double norm2Sqr() {
+    return dot((Vector) this);
+  }
+
+  @SuppressWarnings("boxing")
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder("DenseVector(");
+    try (final Formatter formatter = new Formatter(b, Locale.US)) {
+      final int size = Math.min(25, this.size());
+      for (int i = 0; i < size; ++i) {
+        if (i < size - 1) {
+          formatter.format("%1.3f, ", this.get(i));
+        } else {
+          formatter.format("%1.3f ", this.get(i));
+        }
+      }
+      if (this.size() > 25) {
+        formatter.format("...");
+      }
+    }
+    b.append(')');
+    return b.toString();
+  }
+
+  @Override
+  public Tuple<Integer, Double> min() {
+    double min = get(0);
+    int minIdx = 0;
+    for (int i = 1; i < this.size(); ++i) {
+      final double curVal = get(i);
+      if (curVal < min) {
+        min = curVal;
+        minIdx = i;
+      }
+    }
+    return new Tuple<Integer, Double>(minIdx, min);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractVector.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractVector.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractVector.java
new file mode 100644
index 0000000..f2afcef
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/AbstractVector.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.utils.math;
+
+/**
+ * Abstract base class for {@link Vector} implementations.
+ * <p/>
+ * The only methods to be implemented by subclasses are get, set and size.
+ */
+public abstract class AbstractVector extends AbstractImmutableVector 
implements Vector {
+
+  @Override
+  public abstract void set(int i, double v);
+
+
+  @Override
+  public void add(final Vector that) {
+    for (int index = 0; index < this.size(); ++index) {
+      this.set(index, this.get(index) + that.get(index));
+    }
+  }
+
+  @Override
+  public void multAdd(final double factor, final ImmutableVector that) {
+    for (int index = 0; index < this.size(); ++index) {
+      this.set(index, this.get(index) + factor * that.get(index));
+    }
+  }
+
+  @Override
+  public void scale(final double factor) {
+    for (int index = 0; index < this.size(); ++index) {
+      this.set(index, this.get(index) * factor);
+    }
+  }
+
+
+  @Override
+  public void normalize() {
+    final double factor = 1.0 / this.norm2();
+    this.scale(factor);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/DenseVector.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/DenseVector.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/DenseVector.java
new file mode 100644
index 0000000..bc46521
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/DenseVector.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.utils.math;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * A dense {@link Vector} implementation backed by a double[]
+ */
+public class DenseVector extends AbstractVector implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private final double[] values;
+
+  /**
+   * Creates a dense vector of the given size
+   */
+  public DenseVector(final int size) {
+    this(new double[size]);
+  }
+
+  public DenseVector(final double[] values) {
+    this.values = values;
+  }
+
+  /**
+   * Instantiates a new DenseVector by copying the given other vector.
+   */
+  public DenseVector(final ImmutableVector other) {
+    final int size = other.size();
+    this.values = new double[size];
+    for (int i = 0; i < size; ++i) {
+      this.values[i] = other.get(i);
+    }
+  }
+
+  public DenseVector(final DenseVector other) {
+    this.values = Arrays.copyOf(other.values, other.values.length);
+  }
+
+  @Override
+  public void set(final int i, final double v) {
+    this.values[i] = v;
+  }
+
+  @Override
+  public double get(final int i) {
+    return this.values[i];
+  }
+
+  @Override
+  public int size() {
+    return this.values.length;
+  }
+
+  /**
+   * Access the underlying storage. This is unsafe.
+   */
+  public double[] getValues() {
+    return this.values;
+  }
+
+  /**
+   * Creates a random Vector of size 'size' where each element is individually
+   * drawn from Math.random()
+   *
+   * @return a random Vector of the given size where each element is
+   * individually drawn from Math.random()
+   */
+  public static DenseVector rand(final int size) {
+    return rand(size, new Random());
+  }
+
+  /**
+   * Creates a random Vector of size 'size' where each element is individually
+   * drawn from Math.random()
+   *
+   * @param random the random number generator to use.
+   * @return a random Vector of the given size where each element is
+   * individually drawn from Math.random()
+   */
+  public static DenseVector rand(final int size, final Random random) {
+    final DenseVector vec = new DenseVector(size);
+    for (int i = 0; i < size; ++i) {
+      vec.values[i] = random.nextDouble();
+    }
+    return vec;
+  }
+
+  @Override
+  public Vector newInstance() {
+    return new DenseVector(size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/ImmutableVector.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/ImmutableVector.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/ImmutableVector.java
new file mode 100644
index 0000000..cfd2e8d
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/ImmutableVector.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.utils.math;
+
+
+import org.apache.reef.io.Tuple;
+
+/**
+ * Represents an immutable vector.
+ */
+public interface ImmutableVector {
+  /**
+   * Access the value of the Vector at dimension i
+   *
+   * @param i index
+   * @return the value at index i
+   */
+  double get(int i);
+
+  /**
+   * The size (dimensionality) of the Vector
+   *
+   * @return the size of the Vector.
+   */
+  int size();
+
+  /**
+   * Computes the inner product with another Vector.
+   *
+   * @param that
+   * @return the inner product between two Vectors.
+   */
+  double dot(Vector that);
+
+  /**
+   * Computes the computeSum of all entries in the Vector.
+   *
+   * @return the computeSum of all entries in this Vector
+   */
+  double sum();
+
+  /**
+   * Computes the L2 norm of this Vector.
+   *
+   * @return the L2 norm of this Vector.
+   */
+  double norm2();
+
+  /**
+   * Computes the square of the L2 norm of this Vector.
+   *
+   * @return the square of the L2 norm of this Vector.
+   */
+  double norm2Sqr();
+
+  /**
+   * Computes the min of all entries in the Vector
+   *
+   * @return the min of all entries in this Vector
+   */
+  Tuple<Integer, Double> min();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/SparseVector.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/SparseVector.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/SparseVector.java
new file mode 100644
index 0000000..48a3b21
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/SparseVector.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.utils.math;
+
+
+/**
+ * A sparse vector represented by an index and value array.
+ */
+public final class SparseVector extends AbstractImmutableVector {
+
+  private final double[] values;
+  private final int[] indices;
+  private final int size;
+
+
+  public SparseVector(final double[] values, final int[] indices, final int 
size) {
+    this.values = values;
+    this.indices = indices;
+    this.size = size;
+  }
+
+  public SparseVector(final double[] values, final int[] indices) {
+    this(values, indices, -1);
+  }
+
+
+  @Override
+  public double get(final int index) {
+    for (int i = 0; i < indices.length; ++i) {
+      if (indices[i] == index) {
+        return values[i];
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public int size() {
+    return this.size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Vector.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Vector.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Vector.java
new file mode 100644
index 0000000..021a666
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Vector.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.utils.math;
+
+import java.io.Serializable;
+
+/**
+ * An interface for Linear Alebra Vectors.
+ */
+public interface Vector extends ImmutableVector, Serializable {
+
+  /**
+   * Set dimension i of the Vector to value v
+   *
+   * @param i the index
+   * @param v value
+   */
+  public void set(final int i, final double v);
+
+  /**
+   * Adds the Vector that to this one in place: this += that.
+   *
+   * @param that
+   */
+  public void add(final Vector that);
+
+  /**
+   * this += factor * that.
+   *
+   * @param factor
+   * @param that
+   */
+  public void multAdd(final double factor, final ImmutableVector that);
+
+  /**
+   * Scales this Vector: this *= factor.
+   *
+   * @param factor the scaling factor.
+   */
+  public void scale(final double factor);
+
+
+  /**
+   * Normalizes the Vector according to the L2 norm.
+   */
+  public void normalize();
+
+  /**
+   * Create a new instance of the current type
+   * with elements being zero
+   *
+   * @return zero vector of current dimensionality
+   */
+  public Vector newInstance();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/VectorCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/VectorCodec.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/VectorCodec.java
new file mode 100644
index 0000000..e7ad7e9
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/VectorCodec.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.utils.math;
+
+import org.apache.reef.wake.remote.Codec;
+
+import javax.inject.Inject;
+import java.io.*;
+
+/**
+ * Codec for the Vector type Uses Data*Stream
+ *
+ * @author shravan
+ */
+public class VectorCodec implements Codec<Vector> {
+  /**
+   * This class is instantiated by TANG
+   */
+  @Inject
+  public VectorCodec() {
+    // Intentionally blank
+  }
+
+  @Override
+  public Vector decode(byte[] data) {
+    ByteArrayInputStream bais = new ByteArrayInputStream(data);
+    Vector result;
+    try (DataInputStream dais = new DataInputStream(bais)) {
+      int size = dais.readInt();
+      result = new DenseVector(size);
+      for (int i = 0; i < size; i++)
+        result.set(i, dais.readDouble());
+    } catch (IOException e) {
+      throw new RuntimeException(e.getCause());
+    }
+    return result;
+  }
+
+  @Override
+  public byte[] encode(Vector vec) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(vec.size()
+        * (Double.SIZE / Byte.SIZE));
+    try (DataOutputStream daos = new DataOutputStream(baos)) {
+      daos.writeInt(vec.size());
+      for (int i = 0; i < vec.size(); i++) {
+        daos.writeDouble(vec.get(i));
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e.getCause());
+    }
+    return baos.toByteArray();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Window.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Window.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Window.java
new file mode 100644
index 0000000..659e118
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/math/Window.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.utils.math;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+// TODO: Document
+public class Window {
+
+  private final int maxSize;
+  private final List<Double> list;
+
+  public Window(int size) {
+    this.maxSize = size;
+    list = new ArrayList<>(size);
+  }
+
+  public void add(double d) {
+    if (list.size() < maxSize) {
+      list.add(d);
+      return;
+    }
+    list.remove(0);
+    list.add(d);
+  }
+
+  public double avg() {
+    if (list.size() == 0)
+      return 0;
+    double retVal = 0;
+    for (double d : list) {
+      retVal += d;
+    }
+    return retVal / list.size();
+  }
+
+  public double avgIfAdded(double d) {
+    if (list.isEmpty())
+      return d;
+    int start = (list.size() < maxSize) ? 0 : 1;
+    int numElems = (list.size() < maxSize) ? list.size() + 1 : maxSize;
+    for (int i = start; i < list.size(); i++)
+      d += list.get(i);
+    return d / numElems;
+  }
+
+  public static void main(String[] args) {
+    final Logger log = Logger.getLogger(Window.class.getName());
+    final Window w = new Window(3);
+    for (int i = 1; i < 10; i++) {
+      final double exp = w.avgIfAdded(i);
+      w.add(i);
+      final double act = w.avg();
+      log.log(Level.INFO, "OUT: Exp: {0} Act: {1}", new Object[] {exp, act});
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/timer/Timer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/timer/Timer.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/timer/Timer.java
new file mode 100644
index 0000000..729ddf6
--- /dev/null
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/group/utils/timer/Timer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.group.utils.timer;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class Timer implements AutoCloseable {
+
+  private static final Logger LOG = Logger.getLogger(Timer.class.getName());
+
+  public static final int MINUTES = 60 * 1000;  // ms
+  public static final int HOURS = 60 * MINUTES;
+
+  private final Logger log;
+  private final Level level;
+  private final String description;
+  private final long timeStart;
+
+  public Timer(final Logger log, final String description) {
+    this(log, Level.INFO, description);
+  }
+
+  public Timer(final String description) {
+    this(LOG, Level.INFO, description);
+  }
+
+  public Timer(final Logger log, final Level level, final String description) {
+    this.log = log;
+    this.level = level;
+    this.description = description;
+    this.timeStart = System.currentTimeMillis();
+    this.log.log(this.level, "TIMER Start: {0}", this.description);
+  }
+
+  @Override
+  public void close() {
+    final long timeEnd = System.currentTimeMillis();
+    this.log.log(this.level, "TIMER End: {0} took {1} sec.",
+        new Object[]{this.description, (timeEnd - this.timeStart) / 1000.0});
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
index 960f297..544f333 100644
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/Scheduler.java
@@ -28,7 +28,10 @@ import org.apache.reef.tang.Tang;
 
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java
index e6a5d6c..eefdcde 100644
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java
@@ -33,8 +33,8 @@ import java.util.List;
 public final class BlockingEventHandler<T> implements EventHandler<T> {
 
   private final int expectedSize;
-  private final EventHandler<Iterable<T>> destination;
   private List<T> events = new ArrayList<>();
+  private final EventHandler<Iterable<T>> destination;
 
   public BlockingEventHandler(final int expectedSize, final 
EventHandler<Iterable<T>> destination) {
     this.expectedSize = expectedSize;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java
 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java
index 9d2d5b8..804e67c 100644
--- 
a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java
+++ 
b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java
@@ -31,28 +31,27 @@ import java.util.logging.Logger;
  */
 public class LoggingEventHandler<T> implements EventHandler<T> {
 
+  private static final Logger LOG = 
Logger.getLogger(LoggingEventHandler.class.getName());
+
   private final EventHandler<T> downstreamEventHandler;
-  private final String prefix;
-  private final String suffix;
+  private final String format;
 
   /**
-   * @param prefix                 to be logged before the event
    * @param downstreamEventHandler the event handler to hand the event to
-   * @param suffix                 to be logged after the event
+   * @param format                 Format string to log the event, e.g. "Event 
{0} received".
    */
-  public LoggingEventHandler(final String prefix, EventHandler<T> 
downstreamEventHandler, final String suffix) {
+  public LoggingEventHandler(EventHandler<T> downstreamEventHandler, final 
String format) {
     this.downstreamEventHandler = downstreamEventHandler;
-    this.prefix = prefix;
-    this.suffix = suffix;
+    this.format = format;
   }
 
   public LoggingEventHandler(final EventHandler<T> downstreamEventHandler) {
-    this("", downstreamEventHandler, "");
+    this(downstreamEventHandler, "{0}");
   }
 
   @Override
   public void onNext(final T value) {
-    Logger.getLogger(LoggingEventHandler.class.getName()).log(Level.INFO, 
prefix + value.toString() + suffix);
+    LOG.log(Level.INFO, this.format, value);
     this.downstreamEventHandler.onNext(value);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/pom.xml b/lang/java/reef-io/pom.xml
index 90bdf1d..b44cb77 100644
--- a/lang/java/reef-io/pom.xml
+++ b/lang/java/reef-io/pom.xml
@@ -45,6 +45,11 @@ under the License.
                                     <arg 
value="--java_out=target/generated-sources/proto"/>
                                     <arg 
value="src/main/proto/ns_protocol.proto"/>
                                 </exec>
+                                <exec executable="protoc">
+                                    <arg value="--proto_path=src/main/proto/"/>
+                                    <arg 
value="--java_out=target/generated-sources/proto"/>
+                                    <arg 
value="src/main/proto/group_comm_protocol.proto"/>
+                                </exec>
                             </tasks>
                             
<sourceRoot>target/generated-sources/proto</sourceRoot>
                         </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/GroupChanges.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/GroupChanges.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/GroupChanges.java
new file mode 100644
index 0000000..1758646
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/GroupChanges.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api;
+
+import org.apache.reef.annotations.audience.TaskSide;
+
+/**
+ * Represents the changes in Topology that happened in a communication group
+ * from the last time the user asked for topology changes
+ */
+@TaskSide
+public interface GroupChanges {
+
+  boolean exist();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/config/OperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/config/OperatorSpec.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/config/OperatorSpec.java
new file mode 100644
index 0000000..eabd23c
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/config/OperatorSpec.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.config;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.serialization.Codec;
+
+/**
+ * The specification of an operator submitted by the user
+ * while configuring the communication group.
+ */
+@DriverSide
+@Private
+public interface OperatorSpec {
+
+  /**
+   * @return The codec class to be used to serialize & deserialize data
+   * in the group communication operators.
+   */
+  Class<? extends Codec> getDataCodecClass();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
new file mode 100644
index 0000000..ca486a3
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/CommunicationGroupDriver.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.io.network.group.api.task.GroupCommClient;
+import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Name;
+
+/**
+ * The driver side interface of a Communication Group
+ * Lets one add opertaors and tasks.
+ * Main function is to extract the configuration related
+ * to the Group Communication for a task in the comm group
+ */
+@DriverSide
+public interface CommunicationGroupDriver {
+
+  /**
+   * Add the broadcast operator specified by the
+   * 'spec' with name 'operatorName' into this
+   * Communication Group
+   *
+   * @param operatorName
+   * @param spec
+   * @return
+   */
+  public CommunicationGroupDriver addBroadcast(Class<? extends Name<String>> 
operatorName, BroadcastOperatorSpec spec);
+
+  /**
+   * Add the reduce operator specified by the
+   * 'spec' with name 'operatorName' into this
+   * Communication Group
+   *
+   * @param operatorName
+   * @param spec
+   * @return
+   */
+  public CommunicationGroupDriver addReduce(Class<? extends Name<String>> 
operatorName, ReduceOperatorSpec spec);
+
+  /**
+   * This signals to the service that no more
+   * operator specs will be added to this communication
+   * group and an attempt to do that will throw an
+   * IllegalStateException
+   */
+  public void finalise();
+
+  /**
+   * Returns a configuration that includes the partial task
+   * configuration passed in as 'taskConf' and makes the
+   * current communication group and the operators configured
+   * on it available on the Task side. Provides for injection
+   * of {@link GroupCommClient}
+   *
+   * @param taskConf
+   * @return
+   */
+  public Configuration getTaskConfiguration(Configuration taskConf);
+
+  /**
+   * Add the task represented by this configuration to this
+   * communication group. The configuration needs to contain
+   * the id of the Task that will be used
+   *
+   * @param partialTaskConf
+   */
+  public void addTask(Configuration partialTaskConf);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
new file mode 100644
index 0000000..fc7e919
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommDriver.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.driver;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.io.network.group.impl.driver.GroupCommDriverImpl;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.tang.annotations.Name;
+
+/**
+ * The driver side interface of Group Communication
+ * which is the entry point for the service
+ */
+@DriverSide
+@Provided
+@DefaultImplementation(value = GroupCommDriverImpl.class)
+public interface GroupCommDriver {
+
+  /**
+   * Create a new communication group with the specified name
+   * and the minimum number of tasks needed in this group before
+   * communication can start
+   *
+   * @param groupName
+   * @param numberOfTasks
+   * @return
+   */
+  CommunicationGroupDriver newCommunicationGroup(Class<? extends Name<String>> 
groupName, int numberOfTasks);
+
+  /**
+   * Tests whether the activeContext is a context configured
+   * using the Group Communication Service
+   *
+   * @param activeContext
+   * @return
+   */
+  boolean isConfigured(ActiveContext activeContext);
+
+  /**
+   * @return Configuration needed for a Context that should have
+   * Group Communication Service enabled
+   */
+  Configuration getContextConfiguration();
+
+  /**
+   * @return Configuration needed to enable
+   * Group Communication as a Service
+   */
+  Configuration getServiceConfiguration();
+
+  /**
+   * @return Configuration needed for a Task that should have
+   * Group Communication Service enabled
+   */
+  Configuration getTaskConfiguration(Configuration partialTaskConf);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommServiceDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommServiceDriver.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommServiceDriver.java
new file mode 100644
index 0000000..a097075
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/GroupCommServiceDriver.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.driver;
+
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.io.network.group.impl.driver.GroupCommDriverImpl;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.EStage;
+
+@Private
+@Provided
+@DefaultImplementation(value = GroupCommDriverImpl.class)
+public interface GroupCommServiceDriver extends GroupCommDriver {
+
+  /**
+   * Not user facing but used the Group Communication Service class
+   *
+   * @return The running task stage that will handle the RunningTask
+   * events
+   */
+  EStage<RunningTask> getGroupCommRunningTaskStage();
+
+  /**
+   * Not user facing but used the Group Communication Service class
+   *
+   * @return The running task stage that will handle the FailedTask
+   * events
+   */
+  EStage<FailedTask> getGroupCommFailedTaskStage();
+
+  /**
+   * Not user facing but used the Group Communication Service class
+   *
+   * @return The running task stage that will handle the FailedEvaluator
+   * events
+   */
+  EStage<FailedEvaluator> getGroupCommFailedEvaluatorStage();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
new file mode 100644
index 0000000..d16d4ea
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNode.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.driver;
+
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+
+/**
+ * A node in the Topology representing a Task on the driver
+ * Impl should maintain state relating to whether task is running/dead and
+ * status of neighboring nodes and send ctrl msgs to the tasks indicating
+ * topology changing events
+ */
+public interface TaskNode {
+
+  String getTaskId();
+
+  int getVersion();
+
+  int getNumberOfChildren();
+
+  TaskNode getParent();
+
+  void setParent(TaskNode parent);
+
+  void addChild(TaskNode child);
+
+  void removeChild(TaskNode taskNode);
+
+  boolean isRunning();
+
+  void onRunningTask();
+
+  void onFailedTask();
+
+  boolean hasChanges();
+
+  boolean isNeighborActive(String neighborId);
+
+  void onReceiptOfAcknowledgement(GroupCommunicationMessage msg);
+
+  void onParentRunning();
+
+  void onParentDead();
+
+  void onChildRunning(String childId);
+
+  void onChildDead(String childId);
+
+  /**
+   * Check if this node is ready for sending
+   * TopologySetup
+   */
+  void checkAndSendTopologySetupMessage();
+
+  /**
+   * Check if the neighbor node with id source
+   * is ready for sending TopologySetup
+   * @param source
+   */
+  void checkAndSendTopologySetupMessageFor(String source);
+
+  /**
+   * reset topology setup ensures that update topology is not sent to someone
+   * who is already updating topology which is usually when they are just
+   * (re)starting
+   *
+   * @return
+   */
+  boolean resetTopologySetupSent();
+
+  void waitForTopologySetupOrFailure();
+
+  void setSibling(TaskNode leaf);
+
+  TaskNode successor();
+
+  void updatingTopology ();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNodeStatus.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNodeStatus.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNodeStatus.java
new file mode 100644
index 0000000..93955e1
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/TaskNodeStatus.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.driver;
+
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import 
org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type;
+
+/**
+ * Tracks the Status of the ctrl msgs sent to a
+ * task node in the topology -
+ *   what msgs have been sent to this node and
+ *   what msgs have been ACKed as received by this node
+ *   Status of neighbors
+ * This is used to see whether the local topology
+ * of a Task is completely set-up
+ * It also offers convenience methods for waiting
+ * on receiving ACKs from the task.
+ */
+public interface TaskNodeStatus {
+
+  boolean hasChanges();
+
+  void onTopologySetupMessageSent();
+
+  boolean isActive(String neighborId);
+
+  /**
+   * Process the msg that was received and update
+   * state accordingly
+   */
+  void processAcknowledgement(GroupCommunicationMessage msg);
+
+  /**
+   * To be called before sending a ctrl msg to the task
+   * represented by this node. All ctrl msgs sent to this
+   * node need to be ACKed.
+   * Ctrl msgs will be sent        from a src &
+   * ACK sent from the task will be for a src.
+   * As this is called from the TaskNodeImpl use srcId of msg
+   * In TaskNodeImpl while processMsg        use dstId of msg
+   */
+  public void expectAckFor(final Type msgType, final String srcId);
+
+  /**
+   * Used when the task has failed to clear all
+   * the state that is associated with this task
+   * Also should release the locks held for implementing
+   * the convenience wait* methods
+   */
+  void clearStateAndReleaseLocks();
+
+  /**
+   * This should remove state concerning neighboring tasks
+   * that have failed
+   */
+  void updateFailureOf(String taskId);
+
+  void waitForTopologySetup();
+
+  /**
+   * Called to denote that a UpdateTopology msg will
+   * be sent
+   */
+  void updatingTopology ();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/Topology.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/Topology.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/Topology.java
new file mode 100644
index 0000000..b393d7f
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/driver/Topology.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.driver;
+
+import org.apache.reef.io.network.group.api.config.OperatorSpec;
+import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
+import org.apache.reef.tang.Configuration;
+
+/**
+ * A topology should implement the following
+ * interface so that it can work with the
+ * elastic group communication framework
+ * Currently we have two implementations
+ * 1. Flat 2. Tree
+ */
+public interface Topology {
+
+  /**
+   * Get the version of the Task 'taskId'
+   * that belongs to this topology
+   *
+   * @param taskId
+   * @return
+   */
+  int getNodeVersion(String taskId);
+
+  /**
+   * Get the id of the root task
+   *
+   * @return
+   */
+  String getRootId();
+
+  /**
+   * Set task with id 'senderId' as
+   * the root of this topology
+   *
+   * @param senderId
+   */
+  void setRootTask(String senderId);
+
+  /**
+   * Add task with id 'taskId' to
+   * the topology
+   *
+   * @param taskId
+   */
+  void addTask(String taskId);
+
+  /**
+   * Remove task with id 'taskId' from
+   * the topology
+   *
+   * @param taskId
+   */
+  void removeTask(String taskId);
+
+  /**
+   * Update state on receipt of RunningTask
+   * event for task with id 'id'
+   *
+   * @param id
+   */
+  void onRunningTask(String id);
+
+  /**
+   * Update state on receipt of FailedTask
+   * event for task with id 'id'
+   *
+   * @param id
+   */
+  void onFailedTask(String id);
+
+  /**
+   * Set operator specification of the operator
+   * that is the owner of this topology instance
+   *
+   * @param spec
+   */
+  void setOperatorSpecification(OperatorSpec spec);
+
+  /**
+   * Get the topology portion of the Configuration
+   * for the task 'taskId' that belongs to this
+   * topology
+   *
+   * @param taskId
+   * @return
+   */
+  Configuration getTaskConfiguration(String taskId);
+
+  /**
+   * Update state on receipt of a message
+   * from the tasks
+   *
+   * @param msg
+   */
+  void onReceiptOfMessage(GroupCommunicationMessage msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AbstractGroupCommOperator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AbstractGroupCommOperator.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AbstractGroupCommOperator.java
new file mode 100644
index 0000000..ac89338
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AbstractGroupCommOperator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.operators;
+
+import org.apache.reef.tang.annotations.Name;
+
+public abstract class AbstractGroupCommOperator implements GroupCommOperator {
+
+  @Override
+  public Class<? extends Name<String>> getOperName() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Class<? extends Name<String>> getGroupName() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void initialize() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getVersion() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllGather.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllGather.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllGather.java
new file mode 100644
index 0000000..8c2d047
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllGather.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.operators;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.wake.Identifier;
+
+import java.util.List;
+
+/**
+ * MPI AllGather Operator.
+ * <p/>
+ * Each task applies this operator on an element of type T. The result will be
+ * a list of elements constructed using the elements all-gathered at each
+ * task.
+ */
+public interface AllGather<T> extends GroupCommOperator {
+
+  /**
+   * Apply the operation on element.
+   *
+   * @return List of all elements on which the operation was applied using 
default order
+   */
+  List<T> apply(T element) throws NetworkException,
+      InterruptedException;
+
+  /**
+   * Apply the operation on element.
+   *
+   * @return List of all elements on which the operation was applied using 
order specified
+   */
+  List<T> apply(T element, List<? extends Identifier> order)
+      throws NetworkException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllReduce.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllReduce.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllReduce.java
new file mode 100644
index 0000000..fd4bda5
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/AllReduce.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.operators;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.wake.Identifier;
+
+import java.util.List;
+
+/**
+ * MPI All Reduce Operator. Each task applies this operator on an element of
+ * type T. The result will be an element which is result of applying a reduce
+ * function on the list of all elements on which this operator has been applied
+ */
+public interface AllReduce<T> extends GroupCommOperator {
+
+  /**
+   * Apply the operation on element.
+   *
+   * @return result of all-reduce on all elements operation was applied on.
+   * Reduce function is applied based on default order.
+   */
+  T apply(T aElement) throws InterruptedException, NetworkException;
+
+  /**
+   * Apply the operation on element.
+   *
+   * @return result of all-reduce on all elements operation was applied on.
+   * Reduce function is applied based on specified order.
+   */
+  T apply(T element, List<? extends Identifier> order) throws 
InterruptedException, NetworkException;
+
+  /**
+   * Get the {@link 
org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction} 
configured.
+   *
+   * @return {@link 
org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction}
+   */
+  Reduce.ReduceFunction<T> getReduceFunction();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Broadcast.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Broadcast.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Broadcast.java
new file mode 100644
index 0000000..ae007ac
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Broadcast.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.operators;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.group.impl.operators.BroadcastReceiver;
+import org.apache.reef.io.network.group.impl.operators.BroadcastSender;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * MPI Broadcast operator.
+ * <p/>
+ * The sender or root send's an element that is received by all the receivers 
or other tasks.
+ * <p/>
+ * This is an asymmetric operation and hence the differentiation b/w Sender 
and Receiver.
+ */
+public interface Broadcast {
+
+  /**
+   * Sender or Root.
+   */
+  @DefaultImplementation(BroadcastSender.class)
+  static interface Sender<T> extends GroupCommOperator {
+
+    /**
+     * Send element to all receivers.
+     */
+    void send(T element) throws NetworkException, InterruptedException;
+  }
+
+  /**
+   * Receivers or Non-roots
+   */
+  @DefaultImplementation(BroadcastReceiver.class)
+  static interface Receiver<T> extends GroupCommOperator {
+
+    /**
+     * Receiver the element broadcasted by sender.
+     *
+     * @return the element broadcasted by sender
+     */
+    T receive() throws NetworkException, InterruptedException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
new file mode 100644
index 0000000..a04b02e
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Gather.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.operators;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.wake.Identifier;
+
+import java.util.List;
+
+/**
+ * MPI Gather Operator.
+ * <p/>
+ * This is an operator where the root is a receiver and there are multiple 
senders.
+ * The root or receiver gathers all the elements sent by the senders in a List.
+ */
+public interface Gather {
+
+  /**
+   * Senders or non-roots.
+   */
+  static interface Sender<T> extends GroupCommOperator {
+
+    /**
+     * Send the element to the root/receiver.
+     */
+    void send(T element) throws InterruptedException, NetworkException;
+  }
+
+  /**
+   * Receiver or Root
+   */
+  static interface Receiver<T> extends GroupCommOperator {
+
+    /**
+     * Receive the elements sent by the senders in default order.
+     *
+     * @return elements sent by senders as a List in default order
+     */
+    List<T> receive() throws InterruptedException, NetworkException;
+
+    /**
+     * Receive the elements sent by the senders in specified order
+     *
+     * @return elements sent by senders as a List in specified order
+     */
+    List<T> receive(List<? extends Identifier> order) throws 
InterruptedException, NetworkException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/GroupCommOperator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/GroupCommOperator.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/GroupCommOperator.java
new file mode 100644
index 0000000..ae90d99
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/GroupCommOperator.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.operators;
+
+import org.apache.reef.io.network.exception.ParentDeadException;
+import org.apache.reef.tang.annotations.Name;
+
+public interface GroupCommOperator {
+
+  Class<? extends Name<String>> getOperName();
+
+  Class<? extends Name<String>> getGroupName();
+
+  void initialize() throws ParentDeadException;
+
+  int getVersion();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Reduce.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Reduce.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Reduce.java
new file mode 100644
index 0000000..b638348
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/api/operators/Reduce.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.api.operators;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.group.impl.operators.ReduceReceiver;
+import org.apache.reef.io.network.group.impl.operators.ReduceSender;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.Identifier;
+
+import java.util.List;
+
+/**
+ * MPI Reduce operator.
+ * <p/>
+ * This is another operator with root being receiver All senders send an 
element
+ * to the receiver. These elements are passed through a reduce function and its
+ * result is made available at the root
+ */
+public interface Reduce {
+
+  /**
+   * Receiver or Root
+   */
+  @DefaultImplementation(ReduceReceiver.class)
+  static interface Receiver<T> extends GroupCommOperator {
+
+    /**
+     * Receive values sent by senders and pass them through the reduce
+     * function in default order.
+     *
+     * @return Result of applying reduce function on the elements gathered in 
default order.
+     */
+    T reduce() throws InterruptedException, NetworkException;
+
+    /**
+     * Receive values sent by senders and pass them through the reduce
+     * function in specified order.
+     *
+     * @return Result of applying reduce function on the elements gathered in 
specified order.
+     */
+    T reduce(List<? extends Identifier> order) throws InterruptedException, 
NetworkException;
+
+    /**
+     * The reduce function to be applied on the set of received values
+     *
+     * @return {@link ReduceFunction}
+     */
+    Reduce.ReduceFunction<T> getReduceFunction();
+  }
+
+  /**
+   * Senders or non roots
+   */
+  @DefaultImplementation(ReduceSender.class)
+  static interface Sender<T> extends GroupCommOperator {
+
+    /**
+     * Send the element to the root.
+     */
+    void send(T element) throws NetworkException, InterruptedException;
+
+    /**
+     * The {@link ReduceFunction} to be applied on the set of received values.
+     *
+     * @return {@link ReduceFunction}
+     */
+    Reduce.ReduceFunction<T> getReduceFunction();
+  }
+
+  /**
+   * Interface for a Reduce Function takes in an {@link Iterable} returns an
+   * aggregate value computed from the {@link Iterable}
+   */
+  static interface ReduceFunction<T> {
+    /**
+     * Apply the function on elements.
+     *
+     * @return aggregate value computed from elements.
+     */
+    T apply(Iterable<T> elements);
+  }
+}

Reply via email to