Repository: incubator-reef Updated Branches: refs/heads/master 86413669c -> 05792696a
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java index 2ef12b2..83afde4 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java @@ -18,6 +18,7 @@ */ package org.apache.reef.io.network.group.impl.task; +import org.apache.commons.lang.ArrayUtils; import org.apache.reef.exception.evaluator.NetworkException; import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction; import org.apache.reef.io.network.group.api.task.NodeStruct; @@ -219,10 +220,6 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { msgType, node})); } - /** - * @param childNode - * @return - */ private byte[] receiveFromNode(final NodeStruct node, final boolean remove) { LOG.entering("OperatorTopologyStructImpl", "receiveFromNode", new Object[]{getQualifiedName(), node, remove}); final byte[] retVal = node.getData(); @@ -241,6 +238,62 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { return retVal; } + /** + * Receive data from {@code node}, while checking if it is trying to send a big message. + * Nodes that send big messages will first send an empty data message and + * wait for an ACK before transmitting the actual big message. Thus the + * receiving side checks whether a message is empty or not, and after sending + * an ACK it must wait for another message if the first message was empty. + * + * @param node node to receive a message from + * @param msgType message type + * @return message sent from {@code node} + */ + private byte[] recvFromNodeCheckBigMsg(final NodeStruct node, + final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) { + LOG.entering("OperatorTopologyStructImpl", "recvFromNodeCheckBigMsg", new Object[]{node, msgType}); + + byte[] retVal = receiveFromNode(node, false); + if (retVal != null && retVal.length == 0) { + LOG.finest(getQualifiedName() + " Got msg that node " + node.getId() + + " has large data and is ready to send it. Sending ACK to receive data."); + sendToNode(Utils.EMPTY_BYTE_ARR, msgType, node); + retVal = receiveFromNode(node, true); + + if (retVal != null) { + LOG.finest(getQualifiedName() + " Received large msg from node " + node.getId() + + ". Will process it after ACKing."); + sendToNode(Utils.EMPTY_BYTE_ARR, msgType, node); + } else { + LOG.warning(getQualifiedName() + "Expected large msg from node " + node.getId() + + " but received nothing."); + } + } + + LOG.exiting("OperatorTopologyStructImpl", "recvFromNodeCheckBigMsg", + Arrays.toString(new Object[]{retVal, node, msgType})); + return retVal; + } + + /** + * Retrieves and removes the head of {@code nodesWithData}, waiting if necessary until an element becomes available. + * (Comment taken from {@link java.util.concurrent.BlockingQueue}) + * If interrupted while waiting, then throws a RuntimeException. + * + * @return the head of this queue + */ + private NodeStruct nodesWithDataTakeUnsafe() { + LOG.entering("OperatorTopologyStructImpl", "nodesWithDataTakeUnsafe"); + try { + final NodeStruct child = nodesWithData.take(); + LOG.exiting("OperatorTopologyStructImpl", "nodesWithDataTakeUnsafe", child); + return child; + + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while waiting to take data from nodesWithData queue", e); + } + } + @Override public void sendToParent(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) { LOG.entering("OperatorTopologyStructImpl", "sendToParent", new Object[]{getQualifiedName(), data, msgType}); @@ -264,21 +317,25 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { } @Override - public byte[] recvFromParent() { - LOG.entering("OperatorTopologyStructImpl", "recvFromParent", getQualifiedName()); - LOG.finest(getQualifiedName() + "Waiting for " + parent.getId() + " to send data"); - byte[] retVal = receiveFromNode(parent, true); - if (retVal != null && retVal.length == 0) { - LOG.finest(getQualifiedName() + "Got msg that parent " + parent.getId() - + " has large data and is ready to send data. Sending Ack to receive data"); - sendToNode(Utils.EMPTY_BYTE_ARR, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast, parent); - retVal = receiveFromNode(parent, true); - if (retVal != null) { - LOG.finest(getQualifiedName() + "Received large msg from Parent " + parent.getId() - + ". Will return it after ACKing it"); - sendToNode(Utils.EMPTY_BYTE_ARR, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast, parent); + public void sendToChildren(final Map<String, byte[]> dataMap, + final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) { + LOG.entering("OperatorTopologyStructImpl", "sendToChildren", new Object[]{getQualifiedName(), dataMap, msgType}); + for (final NodeStruct child : children) { + if (dataMap.containsKey(child.getId())) { + sendToNode(dataMap.get(child.getId()), msgType, child); + } else { + throw new RuntimeException("No message specified for " + child.getId() + " in dataMap."); } } + LOG.exiting("OperatorTopologyStructImpl", "sendToChildren", Arrays.toString(new Object[]{getQualifiedName(), + dataMap, msgType})); + } + + @Override + public byte[] recvFromParent(final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) { + LOG.entering("OperatorTopologyStructImpl", "recvFromParent", getQualifiedName()); + LOG.finest(getQualifiedName() + "Waiting for " + parent.getId() + " to send data"); + final byte[] retVal = recvFromNodeCheckBigMsg(parent, msgType); LOG.exiting("OperatorTopologyStructImpl", "recvFromParent", Arrays.toString(new Object[]{retVal, getQualifiedName()})); return retVal; @@ -295,26 +352,10 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { while (!childrenToRcvFrom.isEmpty()) { LOG.finest(getQualifiedName() + "Waiting for some child to send data"); - final NodeStruct child; - try { - child = nodesWithData.take(); - } catch (final InterruptedException e) { - throw new RuntimeException("InterruptedException while waiting to take data from nodesWithData queue", e); - } - byte[] retVal = receiveFromNode(child, false); - if (retVal != null && retVal.length == 0) { - LOG.finest(getQualifiedName() + "Got msg that child " + child.getId() - + " has large data and is ready to send data. Sending Ack to receive data"); - sendToNode(Utils.EMPTY_BYTE_ARR, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce, child); - retVal = receiveFromNode(child, true); - if (retVal != null) { - LOG.finest(getQualifiedName() + "Received large msg from child " + child.getId() - + ". Will reduce it after ACKing it"); - sendToNode(Utils.EMPTY_BYTE_ARR, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce, child); - } else { - LOG.finest(getQualifiedName() + "Will not reduce it"); - } - } + final NodeStruct child = nodesWithDataTakeUnsafe(); + final byte[] retVal = recvFromNodeCheckBigMsg(child, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce); + if (retVal != null) { retLst.add(dataCodec.decode(retVal)); if (retLst.size() == 2) { @@ -331,6 +372,38 @@ public class OperatorTopologyStructImpl implements OperatorTopologyStruct { return retVal; } + /** + * Receive data from all children as a single byte array. + * Messages from children are simply byte-concatenated. + * This method is currently used only by the Gather operator. + * + * @return gathered data as a byte array + */ + @Override + public byte[] recvFromChildren() { + LOG.entering("OperatorTopologyStructImpl", "recvFromChildren", getQualifiedName()); + for (final NodeStruct child : children) { + childrenToRcvFrom.add(child.getId()); + } + + byte[] retVal = new byte[0]; + while (!childrenToRcvFrom.isEmpty()) { + LOG.finest(getQualifiedName() + "Waiting for some child to send data"); + final NodeStruct child = nodesWithDataTakeUnsafe(); + final byte[] receivedVal = recvFromNodeCheckBigMsg(child, + ReefNetworkGroupCommProtos.GroupCommMessage.Type.Gather); + + if (receivedVal != null) { + retVal = ArrayUtils.addAll(retVal, receivedVal); + } + childrenToRcvFrom.remove(child.getId()); + } + + LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren", + Arrays.toString(new Object[]{retVal, getQualifiedName()})); + return retVal; + } + private boolean removedDeadMsg(final String msgSrcId, final int msgSrcVersion) { LOG.entering("OperatorTopologyStructImpl", "removedDeadMsg", new Object[]{getQualifiedName(), msgSrcId, msgSrcVersion}); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterData.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterData.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterData.java new file mode 100644 index 0000000..7fc6367 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterData.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import java.util.Map; + +/** + * Represents data that is transferred across evaluators during Scatter. + */ +public final class ScatterData { + + private final byte[][] myData; + private final Map<String, byte[]> childrenData; + + /** + * Create a {@code ScatterData} instance with the given data. + */ + public ScatterData(final byte[][] myData, final Map<String, byte[]> childrenData) { + this.myData = myData; + this.childrenData = childrenData; + } + + /** + * Returns data that is assigned to this node. + * + * @return data that is assigned to this node + */ + public byte[][] getMyData() { + return this.myData; + } + + /** + * Returns a map of data that is assigned to this node's children. + * + * @return a map of data that is assigned to this node's children + */ + public Map<String, byte[]> getChildrenData() { + return this.childrenData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterDecoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterDecoder.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterDecoder.java new file mode 100644 index 0000000..c751dfb --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterDecoder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import org.apache.reef.wake.remote.Decoder; + +import javax.inject.Inject; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Decode messages that was created by {@code ScatterEncoder}. + */ +public final class ScatterDecoder implements Decoder<ScatterData> { + + @Inject + ScatterDecoder() { + } + + public ScatterData decode(final byte[] data) { + try (final DataInputStream dstream = new DataInputStream(new ByteArrayInputStream(data))) { + final int elementCount = dstream.readInt(); + + // first read data that I should receive + final byte[][] myData = new byte[elementCount][]; + for (int index = 0; index < elementCount; index++) { + final int encodedElementLength = dstream.readInt(); + myData[index] = new byte[encodedElementLength]; + dstream.read(myData[index]); + } + + // and then read the data intended for my children + final Map<String, byte[]> childDataMap = new HashMap<>(); + while (dstream.available() > 0) { + final String childId = dstream.readUTF(); + final byte[] childData = new byte[dstream.readInt()]; + dstream.read(childData); + childDataMap.put(childId, childData); + } + + return new ScatterData(myData, childDataMap); + + } catch (final IOException e) { + throw new RuntimeException("IOException", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterEncoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterEncoder.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterEncoder.java new file mode 100644 index 0000000..967dae0 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterEncoder.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; +import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.wake.Identifier; + +import javax.inject.Inject; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Encode messages for a scatter operation, which can be decoded by {@code ScatterDecoder}. + */ +public final class ScatterEncoder { + + private final CommunicationGroupServiceClient commGroupClient; + + @Inject + ScatterEncoder(final CommunicationGroupServiceClient commGroupClient) { + this.commGroupClient = commGroupClient; + } + + public <T> Map<String, byte[]> encode(final List<T> elements, + final List<Integer> counts, + final List<? extends Identifier> taskOrder, + final Codec<T> dataCodec) { + + // first assign data to all tasks + final Map<String, byte[]> taskIdToBytes = encodeAndDistributeElements(elements, counts, taskOrder, dataCodec); + // then organize the data so that a node keeps its own data as well as its descendants' data + final Map<String, byte[]> childIdToBytes = new HashMap<>(); + + for (final TopologySimpleNode node : commGroupClient.getTopologySimpleNodeRoot().getChildren()) { + childIdToBytes.put(node.getTaskId(), encodeScatterMsgForNode(node, taskIdToBytes)); + } + return childIdToBytes; + } + + /** + * Compute a single byte array message for a node and its children. + * Using {@code taskIdToBytes}, we pack all messages for a + * {@code TopologySimpleNode} and its children into a single byte array. + * + * @param node the target TopologySimpleNode to generate a message for + * @param taskIdToBytes map containing byte array of encoded data for individual Tasks + * @return single byte array message + */ + private byte[] encodeScatterMsgForNode(final TopologySimpleNode node, + final Map<String, byte[]> taskIdToBytes) { + + try (final ByteArrayOutputStream bstream = new ByteArrayOutputStream(); + final DataOutputStream dstream = new DataOutputStream(bstream)) { + + // first write the node's encoded data + final String taskId = node.getTaskId(); + if (taskIdToBytes.containsKey(taskId)) { + dstream.write(taskIdToBytes.get(node.getTaskId())); + + } else { + // in case mapOfTaskToBytes does not contain this node's id, write an empty + // message (zero elements) + dstream.writeInt(0); + } + + // and then write its children's identifiers and their encoded data + for (final TopologySimpleNode child : node.getChildren()) { + dstream.writeUTF(child.getTaskId()); + final byte[] childData = encodeScatterMsgForNode(child, taskIdToBytes); + dstream.writeInt(childData.length); + dstream.write(childData); + } + + return bstream.toByteArray(); + + } catch (final IOException e) { + throw new RuntimeException("IOException", e); + } + } + + /** + * Encode elements into byte arrays, and distribute them across Tasks indicated by Identifiers. + * Note that elements are distributed in the exact order specified in + * {@code elements} and not in a round-robin fashion. + * For example, (1, 2, 3, 4) uniformly distributed to (task1, task2, task3) would be + * {task1: (1, 2), task2: (3), task3: (4)}. + * + * @param elements list of data elements to encode + * @param counts list of numbers specifying how many elements each Task should receive + * @param taskOrder list of Identifiers indicating Task Ids + * @param codec class for encoding data + * @param <T> type of data + * @return byte representation of a map of identifiers to encoded data + */ + private <T> Map<String, byte[]> encodeAndDistributeElements(final List<T> elements, + final List<Integer> counts, + final List<? extends Identifier> taskOrder, + final Codec<T> codec) { + final Map<String, byte[]> taskIdToBytes = new HashMap<>(); + + int elementsIndex = 0; + for (int taskOrderIndex = 0; taskOrderIndex < taskOrder.size(); taskOrderIndex++) { + final int elementCount = counts.get(taskOrderIndex); + + try (final ByteArrayOutputStream bstream = new ByteArrayOutputStream(); + final DataOutputStream dstream = new DataOutputStream(bstream)) { + + dstream.writeInt(elementCount); + for (final T element : elements.subList(elementsIndex, elementsIndex + elementCount)) { + final byte[] encodedElement = codec.encode(element); + dstream.writeInt(encodedElement.length); + dstream.write(encodedElement); + } + taskIdToBytes.put(taskOrder.get(taskOrderIndex).toString(), bstream.toByteArray()); + + } catch (final IOException e) { + throw new RuntimeException("IOException", e); + } + + elementsIndex += elementCount; + } + + return taskIdToBytes; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterHelper.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterHelper.java new file mode 100644 index 0000000..dc29258 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ScatterHelper.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class for ScatterSender. + */ +public final class ScatterHelper { + + /** + * Should not be instantiated. + */ + private ScatterHelper() { + } + + /** + * Uniformly distribute a number of elements across a number of Tasks and return a list of counts. + * If uniform distribution is impossible, then some Tasks will receive one + * more element than others. The sequence of the number of elements for each + * Task is non-increasing. + * + * @param elementCount number of elements to distribute + * @param taskCount number of Tasks that receive elements + * @return list of counts specifying how many elements each Task should receive + */ + public static List<Integer> getUniformCounts(final int elementCount, final int taskCount) { + final int quotient = elementCount / taskCount; + final int remainder = elementCount % taskCount; + + final List<Integer> retList = new ArrayList<>(); + for (int taskIndex = 0; taskIndex < taskCount; taskIndex++) { + if (taskIndex < remainder) { + retList.add(quotient + 1); + } else { + retList.add(quotient); + } + } + return retList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/TopologySerializerTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/TopologySerializerTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/TopologySerializerTest.java new file mode 100644 index 0000000..d26b4df --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/TopologySerializerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group; + +import org.apache.reef.io.network.group.api.driver.TaskNode; +import org.apache.reef.io.network.group.impl.driver.TaskNodeImpl; +import org.apache.reef.io.network.group.impl.driver.TopologySerializer; +import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode; +import org.apache.reef.io.network.util.Pair; +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.*; + +public class TopologySerializerTest { + + @Test + public void testEncodeDecode() { + final IdentifierFactory ifac = new StringIdentifierFactory(); + + // create a topology: Task-0[Task-1[Task-3], Task-2] + final TaskNode rootNode = new TaskNodeImpl(null, null, null, "Task-0", null, true); + final TaskNode childNode1 = new TaskNodeImpl(null, null, null, "Task-1", null, false); + final TaskNode childNode2 = new TaskNodeImpl(null, null, null, "Task-2", null, false); + final TaskNode childNode3 = new TaskNodeImpl(null, null, null, "Task-3", null, false); + rootNode.addChild(childNode1); + rootNode.addChild(childNode2); + childNode1.addChild(childNode3); + + final Pair<TopologySimpleNode, List<Identifier>> retPair = + TopologySerializer.decode(TopologySerializer.encode(rootNode), ifac); + + // check topology is recovered + assertEquals(retPair.getFirst().getTaskId(), "Task-0"); + for (final TopologySimpleNode child : retPair.getFirst().getChildren()) { + if (child.getTaskId().equals("Task-1")) { + for (final TopologySimpleNode childchild : child.getChildren()) { + assertEquals(childchild.getTaskId(), "Task-3"); + } + } else { + assertTrue(child.getTaskId().equals("Task-2")); + } + } + + // check identifier list contains [Task-0, Task-1, Task-2, Task-3] and nothing else + assertTrue(retPair.getSecond().contains(ifac.getNewInstance("Task-0"))); + assertTrue(retPair.getSecond().contains(ifac.getNewInstance("Task-1"))); + assertTrue(retPair.getSecond().contains(ifac.getNewInstance("Task-2"))); + assertTrue(retPair.getSecond().contains(ifac.getNewInstance("Task-3"))); + assertEquals(retPair.getSecond().size(), 4); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterCodecTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterCodecTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterCodecTest.java new file mode 100644 index 0000000..61ca5ca --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterCodecTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; +import org.apache.reef.io.network.group.impl.driver.TopologySimpleNode; +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.io.serialization.SerializableCodec; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.junit.Test; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * Tests for Scatter codec classes, {@code ScatterEncoder} and {@code ScatterDecoder}. + */ +public final class ScatterCodecTest { + + /** + * Test that {@code ScatterEncoder} and {@code ScatterDecoder} function correctly. + * Create a small topology of 4 nodes and simulate a scatter operation. + */ + @Test + public void testEncodeDecode() { + final IdentifierFactory ifac = new StringIdentifierFactory(); + final Codec<Integer> codec = new SerializableCodec<>(); + + final List<Integer> elements = new LinkedList<>(); + for (int element = 0; element < 400; element++) { + elements.add(element); + } + + final List<Integer> counts = new LinkedList<>(); + final List<Identifier> taskOrder = new LinkedList<>(); + for (int index = 0; index < 4; index++) { + counts.add(100); + taskOrder.add(ifac.getNewInstance("Task-" + index)); + } + + final TopologySimpleNode rootNode = new TopologySimpleNode("Task-0"); + final TopologySimpleNode childNode1 = new TopologySimpleNode("Task-1"); + final TopologySimpleNode childNode2 = new TopologySimpleNode("Task-2"); + final TopologySimpleNode childNode3 = new TopologySimpleNode("Task-3"); + rootNode.addChild(childNode1); + rootNode.addChild(childNode2); + childNode1.addChild(childNode3); + + final CommunicationGroupServiceClient mockCommGroupClient = mock(CommunicationGroupServiceClient.class); + when(mockCommGroupClient.getTopologySimpleNodeRoot()).thenReturn(rootNode); + final ScatterEncoder scatterEncoder = new ScatterEncoder(mockCommGroupClient); + final ScatterDecoder scatterDecoder = new ScatterDecoder(); + + final Map<String, byte[]> encodedDataMap = scatterEncoder.encode(elements, counts, taskOrder, codec); + + // check msg correctness for childNode1 (Task-1) + final ScatterData childNode1Data = scatterDecoder.decode(encodedDataMap.get(childNode1.getTaskId())); + for (int index = 0; index < 100; index++) { + assertTrue(index + 100 == codec.decode(childNode1Data.getMyData()[index])); + } + assertTrue(childNode1Data.getChildrenData().containsKey("Task-3")); + assertEquals(childNode1Data.getChildrenData().size(), 1); + + // check msg correctness for childNode2 (Task-2) + final ScatterData childNode2Data = scatterDecoder.decode(encodedDataMap.get(childNode2.getTaskId())); + for (int index = 0; index < 100; index++) { + assertTrue(index + 200 == codec.decode(childNode2Data.getMyData()[index])); + } + assertTrue(childNode2Data.getChildrenData().isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterHelperTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterHelperTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterHelperTest.java new file mode 100644 index 0000000..11fcf75 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/ScatterHelperTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.*; + +/** + * Tests for util classes related to Scatter. + */ +public final class ScatterHelperTest { + + /** + * Test that {@code ScatterHelper.getUniformCounts} functions correctly by giving various sets of inputs. + */ + @Test + public void testGetUniformCounts() { + for (int taskCount = 1; taskCount < 100; taskCount++) { + final int elementCount = 10000; + final List<Integer> retVals = ScatterHelper.getUniformCounts(elementCount, taskCount); + + int sum = 0; + int maxVal = Integer.MIN_VALUE; + int minVal = Integer.MAX_VALUE; + int prevVal = Integer.MAX_VALUE; + for (final int retVal : retVals) { + sum += retVal; + maxVal = retVal > maxVal ? retVal : maxVal; + minVal = retVal < minVal ? retVal : minVal; + assertTrue(prevVal >= retVal); // monotonic (non-increasing) list check + prevVal = retVal; + } + assertEquals(elementCount, sum); // all elements were considered check + assertEquals(maxVal - minVal, elementCount % taskCount == 0 ? 0 : 1); // uniform distribution check + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/05792696/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/package-info.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/package-info.java new file mode 100644 index 0000000..8510b08 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/impl/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Tests for Scatter util classes. + */ +package org.apache.reef.io.network.group.impl.utils;
