Repository: incubator-reef
Updated Branches:
  refs/heads/master 86413669c -> 05792696a


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
index 2ef12b2..83afde4 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.io.network.group.impl.task;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.reef.exception.evaluator.NetworkException;
 import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction;
 import org.apache.reef.io.network.group.api.task.NodeStruct;
@@ -219,10 +220,6 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
         msgType, node}));
   }
 
-  /**
-   * @param childNode
-   * @return
-   */
   private byte[] receiveFromNode(final NodeStruct node, final boolean remove) {
     LOG.entering("OperatorTopologyStructImpl", "receiveFromNode", new 
Object[]{getQualifiedName(), node, remove});
     final byte[] retVal = node.getData();
@@ -241,6 +238,62 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
     return retVal;
   }
 
+  /**
+   * Receive data from {@code node}, while checking if it is trying to send a 
big message.
+   * Nodes that send big messages will first send an empty data message and
+   * wait for an ACK before transmitting the actual big message. Thus the
+   * receiving side checks whether a message is empty or not, and after sending
+   * an ACK it must wait for another message if the first message was empty.
+   *
+   * @param node node to receive a message from
+   * @param msgType message type
+   * @return message sent from {@code node}
+   */
+  private byte[] recvFromNodeCheckBigMsg(final NodeStruct node,
+                                         final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) {
+    LOG.entering("OperatorTopologyStructImpl", "recvFromNodeCheckBigMsg", new 
Object[]{node, msgType});
+
+    byte[] retVal = receiveFromNode(node, false);
+    if (retVal != null && retVal.length == 0) {
+      LOG.finest(getQualifiedName() + " Got msg that node " + node.getId()
+          + " has large data and is ready to send it. Sending ACK to receive 
data.");
+      sendToNode(Utils.EMPTY_BYTE_ARR, msgType, node);
+      retVal = receiveFromNode(node, true);
+
+      if (retVal != null) {
+        LOG.finest(getQualifiedName() + " Received large msg from node " + 
node.getId()
+            + ". Will process it after ACKing.");
+        sendToNode(Utils.EMPTY_BYTE_ARR, msgType, node);
+      } else {
+        LOG.warning(getQualifiedName() + "Expected large msg from node " + 
node.getId()
+            + " but received nothing.");
+      }
+    }
+
+    LOG.exiting("OperatorTopologyStructImpl", "recvFromNodeCheckBigMsg",
+        Arrays.toString(new Object[]{retVal, node, msgType}));
+    return retVal;
+  }
+
+  /**
+   * Retrieves and removes the head of {@code nodesWithData}, waiting if 
necessary until an element becomes available.
+   * (Comment taken from {@link java.util.concurrent.BlockingQueue})
+   * If interrupted while waiting, then throws a RuntimeException.
+   *
+   * @return the head of this queue
+   */
+  private NodeStruct nodesWithDataTakeUnsafe() {
+    LOG.entering("OperatorTopologyStructImpl", "nodesWithDataTakeUnsafe");
+    try {
+      final NodeStruct child = nodesWithData.take();
+      LOG.exiting("OperatorTopologyStructImpl", "nodesWithDataTakeUnsafe", 
child);
+      return child;
+
+    } catch (final InterruptedException e) {
+      throw new RuntimeException("InterruptedException while waiting to take 
data from nodesWithData queue", e);
+    }
+  }
+
   @Override
   public void sendToParent(final byte[] data, final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) {
     LOG.entering("OperatorTopologyStructImpl", "sendToParent", new 
Object[]{getQualifiedName(), data, msgType});
@@ -264,21 +317,25 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
   }
 
   @Override
-  public byte[] recvFromParent() {
-    LOG.entering("OperatorTopologyStructImpl", "recvFromParent", 
getQualifiedName());
-    LOG.finest(getQualifiedName() + "Waiting for " + parent.getId() + " to 
send data");
-    byte[] retVal = receiveFromNode(parent, true);
-    if (retVal != null && retVal.length == 0) {
-      LOG.finest(getQualifiedName() + "Got msg that parent " + parent.getId()
-          + " has large data and is ready to send data. Sending Ack to receive 
data");
-      sendToNode(Utils.EMPTY_BYTE_ARR, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast, parent);
-      retVal = receiveFromNode(parent, true);
-      if (retVal != null) {
-        LOG.finest(getQualifiedName() + "Received large msg from Parent " + 
parent.getId()
-            + ". Will return it after ACKing it");
-        sendToNode(Utils.EMPTY_BYTE_ARR, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast, parent);
+  public void sendToChildren(final Map<String, byte[]> dataMap,
+                             final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) {
+    LOG.entering("OperatorTopologyStructImpl", "sendToChildren", new 
Object[]{getQualifiedName(), dataMap, msgType});
+    for (final NodeStruct child : children) {
+      if (dataMap.containsKey(child.getId())) {
+        sendToNode(dataMap.get(child.getId()), msgType, child);
+      } else {
+        throw new RuntimeException("No message specified for " + child.getId() 
+ " in dataMap.");
       }
     }
+    LOG.exiting("OperatorTopologyStructImpl", "sendToChildren", 
Arrays.toString(new Object[]{getQualifiedName(),
+        dataMap, msgType}));
+  }
+
+  @Override
+  public byte[] recvFromParent(final 
ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) {
+    LOG.entering("OperatorTopologyStructImpl", "recvFromParent", 
getQualifiedName());
+    LOG.finest(getQualifiedName() + "Waiting for " + parent.getId() + " to 
send data");
+    final byte[] retVal = recvFromNodeCheckBigMsg(parent, msgType);
     LOG.exiting("OperatorTopologyStructImpl", "recvFromParent",
         Arrays.toString(new Object[]{retVal, getQualifiedName()}));
     return retVal;
@@ -295,26 +352,10 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
 
     while (!childrenToRcvFrom.isEmpty()) {
       LOG.finest(getQualifiedName() + "Waiting for some child to send data");
-      final NodeStruct child;
-      try {
-        child = nodesWithData.take();
-      } catch (final InterruptedException e) {
-        throw new RuntimeException("InterruptedException while waiting to take 
data from nodesWithData queue", e);
-      }
-      byte[] retVal = receiveFromNode(child, false);
-      if (retVal != null && retVal.length == 0) {
-        LOG.finest(getQualifiedName() + "Got msg that child " + child.getId()
-            + " has large data and is ready to send data. Sending Ack to 
receive data");
-        sendToNode(Utils.EMPTY_BYTE_ARR, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce, child);
-        retVal = receiveFromNode(child, true);
-        if (retVal != null) {
-          LOG.finest(getQualifiedName() + "Received large msg from child " + 
child.getId()
-              + ". Will reduce it after ACKing it");
-          sendToNode(Utils.EMPTY_BYTE_ARR, 
ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce, child);
-        } else {
-          LOG.finest(getQualifiedName() + "Will not reduce it");
-        }
-      }
+      final NodeStruct child = nodesWithDataTakeUnsafe();
+      final byte[] retVal = recvFromNodeCheckBigMsg(child,
+          ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce);
+
       if (retVal != null) {
         retLst.add(dataCodec.decode(retVal));
         if (retLst.size() == 2) {
@@ -331,6 +372,38 @@ public class OperatorTopologyStructImpl implements 
OperatorTopologyStruct {
     return retVal;
   }
 
+  /**
+   * Receive data from all children as a single byte array.
+   * Messages from children are simply byte-concatenated.
+   * This method is currently used only by the Gather operator.
+   *
+   * @return gathered data as a byte array
+   */
+  @Override
+  public byte[] recvFromChildren() {
+    LOG.entering("OperatorTopologyStructImpl", "recvFromChildren", 
getQualifiedName());
+    for (final NodeStruct child : children) {
+      childrenToRcvFrom.add(child.getId());
+    }
+
+    byte[] retVal = new byte[0];
+    while (!childrenToRcvFrom.isEmpty()) {
+      LOG.finest(getQualifiedName() + "Waiting for some child to send data");
+      final NodeStruct child = nodesWithDataTakeUnsafe();
+      final byte[] receivedVal = recvFromNodeCheckBigMsg(child,
+          ReefNetworkGroupCommProtos.GroupCommMessage.Type.Gather);
+
+      if (receivedVal != null) {
+        retVal = ArrayUtils.addAll(retVal, receivedVal);
+      }
+      childrenToRcvFrom.remove(child.getId());
+    }
+
+    LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren",
+        Arrays.toString(new Object[]{retVal, getQualifiedName()}));
+    return retVal;
+  }
+
   private boolean removedDeadMsg(final String msgSrcId, final int 
msgSrcVersion) {
     LOG.entering("OperatorTopologyStructImpl", "removedDeadMsg", new 
Object[]{getQualifiedName(), msgSrcId,
         msgSrcVersion});

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterData.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterData.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterData.java
new file mode 100644
index 0000000..7fc6367
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterData.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import java.util.Map;
+
+/**
+ * Represents data that is transferred across evaluators during Scatter.
+ */
+public final class ScatterData {
+
+  private final byte[][] myData;
+  private final Map<String, byte[]> childrenData;
+
+  /**
+   * Create a {@code ScatterData} instance with the given data.
+   */
+  public ScatterData(final byte[][] myData, final Map<String, byte[]> 
childrenData) {
+    this.myData = myData;
+    this.childrenData = childrenData;
+  }
+
+  /**
+   * Returns data that is assigned to this node.
+   *
+   * @return data that is assigned to this node
+   */
+  public byte[][] getMyData() {
+    return this.myData;
+  }
+
+  /**
+   * Returns a map of data that is assigned to this node's children.
+   *
+   * @return a map of data that is assigned to this node's children
+   */
+  public Map<String, byte[]> getChildrenData() {
+    return this.childrenData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterDecoder.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterDecoder.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterDecoder.java
new file mode 100644
index 0000000..c751dfb
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterDecoder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import org.apache.reef.wake.remote.Decoder;
+
+import javax.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Decode messages that was created by {@code ScatterEncoder}.
+ */
+public final class ScatterDecoder implements Decoder<ScatterData> {
+
+  @Inject
+  ScatterDecoder() {
+  }
+
+  public ScatterData decode(final byte[] data) {
+    try (final DataInputStream dstream = new DataInputStream(new 
ByteArrayInputStream(data))) {
+      final int elementCount = dstream.readInt();
+
+      // first read data that I should receive
+      final byte[][] myData = new byte[elementCount][];
+      for (int index = 0; index < elementCount; index++) {
+        final int encodedElementLength = dstream.readInt();
+        myData[index] =  new byte[encodedElementLength];
+        dstream.read(myData[index]);
+      }
+
+      // and then read the data intended for my children
+      final Map<String, byte[]> childDataMap = new HashMap<>();
+      while (dstream.available() > 0) {
+        final String childId = dstream.readUTF();
+        final byte[] childData = new byte[dstream.readInt()];
+        dstream.read(childData);
+        childDataMap.put(childId, childData);
+      }
+
+      return new ScatterData(myData, childDataMap);
+
+    } catch (final IOException e) {
+      throw new RuntimeException("IOException", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterEncoder.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterEncoder.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterEncoder.java
new file mode 100644
index 0000000..967dae0
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterEncoder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import 
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.wake.Identifier;
+
+import javax.inject.Inject;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Encode messages for a scatter operation, which can be decoded by {@code 
ScatterDecoder}.
+ */
+public final class ScatterEncoder {
+
+  private final CommunicationGroupServiceClient commGroupClient;
+
+  @Inject
+  ScatterEncoder(final CommunicationGroupServiceClient commGroupClient) {
+    this.commGroupClient = commGroupClient;
+  }
+
+  public <T> Map<String, byte[]> encode(final List<T> elements,
+                                        final List<Integer> counts,
+                                        final List<? extends Identifier> 
taskOrder,
+                                        final Codec<T> dataCodec) {
+
+    // first assign data to all tasks
+    final Map<String, byte[]> taskIdToBytes = 
encodeAndDistributeElements(elements, counts, taskOrder, dataCodec);
+    // then organize the data so that a node keeps its own data as well as its 
descendants' data
+    final Map<String, byte[]> childIdToBytes = new HashMap<>();
+
+    for (final TopologySimpleNode node : 
commGroupClient.getTopologySimpleNodeRoot().getChildren()) {
+      childIdToBytes.put(node.getTaskId(), encodeScatterMsgForNode(node, 
taskIdToBytes));
+    }
+    return childIdToBytes;
+  }
+
+  /**
+   * Compute a single byte array message for a node and its children.
+   * Using {@code taskIdToBytes}, we pack all messages for a
+   * {@code TopologySimpleNode} and its children into a single byte array.
+   *
+   * @param node the target TopologySimpleNode to generate a message for
+   * @param taskIdToBytes map containing byte array of encoded data for 
individual Tasks
+   * @return single byte array message
+   */
+  private byte[] encodeScatterMsgForNode(final TopologySimpleNode node,
+                                         final Map<String, byte[]> 
taskIdToBytes) {
+
+    try (final ByteArrayOutputStream bstream = new ByteArrayOutputStream();
+         final DataOutputStream dstream = new DataOutputStream(bstream)) {
+
+      // first write the node's encoded data
+      final String taskId = node.getTaskId();
+      if (taskIdToBytes.containsKey(taskId)) {
+        dstream.write(taskIdToBytes.get(node.getTaskId()));
+
+      } else {
+        // in case mapOfTaskToBytes does not contain this node's id, write an 
empty
+        // message (zero elements)
+        dstream.writeInt(0);
+      }
+
+      // and then write its children's identifiers and their encoded data
+      for (final TopologySimpleNode child : node.getChildren()) {
+        dstream.writeUTF(child.getTaskId());
+        final byte[] childData = encodeScatterMsgForNode(child, taskIdToBytes);
+        dstream.writeInt(childData.length);
+        dstream.write(childData);
+      }
+
+      return bstream.toByteArray();
+
+    } catch (final IOException e) {
+      throw new RuntimeException("IOException", e);
+    }
+  }
+
+  /**
+   * Encode elements into byte arrays, and distribute them across Tasks 
indicated by Identifiers.
+   * Note that elements are distributed in the exact order specified in
+   * {@code elements} and not in a round-robin fashion.
+   * For example, (1, 2, 3, 4) uniformly distributed to (task1, task2, task3) 
would be
+   * {task1: (1, 2), task2: (3), task3: (4)}.
+   *
+   * @param elements list of data elements to encode
+   * @param counts list of numbers specifying how many elements each Task 
should receive
+   * @param taskOrder list of Identifiers indicating Task Ids
+   * @param codec class for encoding data
+   * @param <T> type of data
+   * @return byte representation of a map of identifiers to encoded data
+   */
+  private <T> Map<String, byte[]> encodeAndDistributeElements(final List<T> 
elements,
+                                                              final 
List<Integer> counts,
+                                                              final List<? 
extends Identifier> taskOrder,
+                                                              final Codec<T> 
codec) {
+    final Map<String, byte[]> taskIdToBytes = new HashMap<>();
+
+    int elementsIndex = 0;
+    for (int taskOrderIndex = 0; taskOrderIndex < taskOrder.size(); 
taskOrderIndex++) {
+      final int elementCount = counts.get(taskOrderIndex);
+
+      try (final ByteArrayOutputStream bstream = new ByteArrayOutputStream();
+           final DataOutputStream dstream = new DataOutputStream(bstream)) {
+
+        dstream.writeInt(elementCount);
+        for (final T element : elements.subList(elementsIndex, elementsIndex + 
elementCount)) {
+          final byte[] encodedElement = codec.encode(element);
+          dstream.writeInt(encodedElement.length);
+          dstream.write(encodedElement);
+        }
+        taskIdToBytes.put(taskOrder.get(taskOrderIndex).toString(), 
bstream.toByteArray());
+
+      } catch (final IOException e) {
+        throw new RuntimeException("IOException",  e);
+      }
+
+      elementsIndex += elementCount;
+    }
+
+    return taskIdToBytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterHelper.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterHelper.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterHelper.java
new file mode 100644
index 0000000..dc29258
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterHelper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utility class for ScatterSender.
+ */
+public final class ScatterHelper {
+
+  /**
+   * Should not be instantiated.
+   */
+  private ScatterHelper() {
+  }
+
+  /**
+   * Uniformly distribute a number of elements across a number of Tasks and 
return a list of counts.
+   * If uniform distribution is impossible, then some Tasks will receive one
+   * more element than others. The sequence of the number of elements for each
+   * Task is non-increasing.
+   *
+   * @param elementCount number of elements to distribute
+   * @param taskCount number of Tasks that receive elements
+   * @return list of counts specifying how many elements each Task should 
receive
+   */
+  public static List<Integer> getUniformCounts(final int elementCount, final 
int taskCount) {
+    final int quotient = elementCount / taskCount;
+    final int remainder = elementCount % taskCount;
+
+    final List<Integer> retList = new ArrayList<>();
+    for (int taskIndex = 0; taskIndex < taskCount; taskIndex++) {
+      if (taskIndex < remainder) {
+        retList.add(quotient + 1);
+      } else {
+        retList.add(quotient);
+      }
+    }
+    return retList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/TopologySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/TopologySerializerTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/TopologySerializerTest.java
new file mode 100644
index 0000000..d26b4df
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/TopologySerializerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group;
+
+import org.apache.reef.io.network.group.api.driver.TaskNode;
+import org.apache.reef.io.network.group.impl.driver.TaskNodeImpl;
+import org.apache.reef.io.network.group.impl.driver.TopologySerializer;
+import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode;
+import org.apache.reef.io.network.util.Pair;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TopologySerializerTest {
+
+  @Test
+  public void testEncodeDecode() {
+    final IdentifierFactory ifac = new StringIdentifierFactory();
+
+    // create a topology: Task-0[Task-1[Task-3], Task-2]
+    final TaskNode rootNode = new TaskNodeImpl(null, null, null, "Task-0", 
null, true);
+    final TaskNode childNode1 = new TaskNodeImpl(null, null, null, "Task-1", 
null, false);
+    final TaskNode childNode2 = new TaskNodeImpl(null, null, null, "Task-2", 
null, false);
+    final TaskNode childNode3 = new TaskNodeImpl(null, null, null, "Task-3", 
null, false);
+    rootNode.addChild(childNode1);
+    rootNode.addChild(childNode2);
+    childNode1.addChild(childNode3);
+
+    final Pair<TopologySimpleNode, List<Identifier>> retPair =
+        TopologySerializer.decode(TopologySerializer.encode(rootNode), ifac);
+
+    // check topology is recovered
+    assertEquals(retPair.getFirst().getTaskId(), "Task-0");
+    for (final TopologySimpleNode child : retPair.getFirst().getChildren()) {
+      if (child.getTaskId().equals("Task-1")) {
+        for (final TopologySimpleNode childchild : child.getChildren()) {
+          assertEquals(childchild.getTaskId(), "Task-3");
+        }
+      } else {
+        assertTrue(child.getTaskId().equals("Task-2"));
+      }
+    }
+
+    // check identifier list contains [Task-0, Task-1, Task-2, Task-3] and 
nothing else
+    assertTrue(retPair.getSecond().contains(ifac.getNewInstance("Task-0")));
+    assertTrue(retPair.getSecond().contains(ifac.getNewInstance("Task-1")));
+    assertTrue(retPair.getSecond().contains(ifac.getNewInstance("Task-2")));
+    assertTrue(retPair.getSecond().contains(ifac.getNewInstance("Task-3")));
+    assertEquals(retPair.getSecond().size(), 4);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterCodecTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterCodecTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterCodecTest.java
new file mode 100644
index 0000000..61ca5ca
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterCodecTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import 
org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient;
+import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.serialization.SerializableCodec;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for Scatter codec classes, {@code ScatterEncoder} and {@code 
ScatterDecoder}.
+ */
+public final class ScatterCodecTest {
+
+  /**
+   * Test that {@code ScatterEncoder} and {@code ScatterDecoder} function 
correctly.
+   * Create a small topology of 4 nodes and simulate a scatter operation.
+   */
+  @Test
+  public void testEncodeDecode() {
+    final IdentifierFactory ifac = new StringIdentifierFactory();
+    final Codec<Integer> codec = new SerializableCodec<>();
+
+    final List<Integer> elements = new LinkedList<>();
+    for (int element = 0; element < 400; element++) {
+      elements.add(element);
+    }
+
+    final List<Integer> counts = new LinkedList<>();
+    final List<Identifier> taskOrder = new LinkedList<>();
+    for (int index = 0; index < 4; index++) {
+      counts.add(100);
+      taskOrder.add(ifac.getNewInstance("Task-" + index));
+    }
+
+    final TopologySimpleNode rootNode = new TopologySimpleNode("Task-0");
+    final TopologySimpleNode childNode1 = new TopologySimpleNode("Task-1");
+    final TopologySimpleNode childNode2 = new TopologySimpleNode("Task-2");
+    final TopologySimpleNode childNode3 = new TopologySimpleNode("Task-3");
+    rootNode.addChild(childNode1);
+    rootNode.addChild(childNode2);
+    childNode1.addChild(childNode3);
+
+    final CommunicationGroupServiceClient mockCommGroupClient = 
mock(CommunicationGroupServiceClient.class);
+    when(mockCommGroupClient.getTopologySimpleNodeRoot()).thenReturn(rootNode);
+    final ScatterEncoder scatterEncoder = new 
ScatterEncoder(mockCommGroupClient);
+    final ScatterDecoder scatterDecoder = new ScatterDecoder();
+
+    final Map<String, byte[]> encodedDataMap = scatterEncoder.encode(elements, 
counts, taskOrder, codec);
+
+    // check msg correctness for childNode1 (Task-1)
+    final ScatterData childNode1Data = 
scatterDecoder.decode(encodedDataMap.get(childNode1.getTaskId()));
+    for (int index = 0; index < 100; index++) {
+      assertTrue(index + 100 == 
codec.decode(childNode1Data.getMyData()[index]));
+    }
+    assertTrue(childNode1Data.getChildrenData().containsKey("Task-3"));
+    assertEquals(childNode1Data.getChildrenData().size(), 1);
+
+    // check msg correctness for childNode2 (Task-2)
+    final ScatterData childNode2Data = 
scatterDecoder.decode(encodedDataMap.get(childNode2.getTaskId()));
+    for (int index = 0; index < 100; index++) {
+      assertTrue(index + 200 == 
codec.decode(childNode2Data.getMyData()[index]));
+    }
+    assertTrue(childNode2Data.getChildrenData().isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterHelperTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterHelperTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterHelperTest.java
new file mode 100644
index 0000000..11fcf75
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterHelperTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.group.impl.utils;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for util classes related to Scatter.
+ */
+public final class ScatterHelperTest {
+
+  /**
+   * Test that {@code ScatterHelper.getUniformCounts} functions correctly by 
giving various sets of inputs.
+   */
+  @Test
+  public void testGetUniformCounts() {
+    for (int taskCount = 1; taskCount < 100; taskCount++) {
+      final int elementCount = 10000;
+      final List<Integer> retVals = 
ScatterHelper.getUniformCounts(elementCount, taskCount);
+
+      int sum = 0;
+      int maxVal = Integer.MIN_VALUE;
+      int minVal = Integer.MAX_VALUE;
+      int prevVal = Integer.MAX_VALUE;
+      for (final int retVal : retVals) {
+        sum += retVal;
+        maxVal = retVal > maxVal ? retVal : maxVal;
+        minVal = retVal < minVal ? retVal : minVal;
+        assertTrue(prevVal >= retVal); // monotonic (non-increasing) list check
+        prevVal = retVal;
+      }
+      assertEquals(elementCount, sum); // all elements were considered check
+      assertEquals(maxVal - minVal, elementCount % taskCount == 0 ? 0 : 1); // 
uniform distribution check
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/package-info.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/package-info.java
new file mode 100644
index 0000000..8510b08
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for Scatter util classes.
+ */
+package org.apache.reef.io.network.group.impl.utils;

Reply via email to