Repository: incubator-reef Updated Branches: refs/heads/master 0911c0832 -> 6c6ad3367
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java new file mode 100644 index 0000000..e3edb01 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyStructImpl.java @@ -0,0 +1,579 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.task; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.group.api.operators.Reduce.ReduceFunction; +import org.apache.reef.io.network.group.api.task.NodeStruct; +import org.apache.reef.io.network.group.api.task.OperatorTopologyStruct; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.operators.Sender; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.tang.annotations.Name; + +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Logger; + +/** + * + */ +public class OperatorTopologyStructImpl implements OperatorTopologyStruct { + + private static final int SMALL_MSG_LENGTH = 1 << 20; + + private static final Logger LOG = Logger.getLogger(OperatorTopologyStructImpl.class.getName()); + + private final Class<? extends Name<String>> groupName; + private final Class<? extends Name<String>> operName; + private final String selfId; + private final String driverId; + private final Sender sender; + + private boolean changes = true; + private NodeStruct parent; + private final List<NodeStruct> children = new ArrayList<>(); + + private final BlockingQueue<NodeStruct> nodesWithData = new LinkedBlockingQueue<>(); + private final Set<String> childrenToRcvFrom = new HashSet<>(); + + private final ConcurrentMap<String, Set<Integer>> deadMsgs = new ConcurrentHashMap<>(); + + private final int version; + + public OperatorTopologyStructImpl(final Class<? extends Name<String>> groupName, + final Class<? extends Name<String>> operName, final String selfId, + final String driverId, final Sender sender, final int version) { + super(); + this.groupName = groupName; + this.operName = operName; + this.selfId = selfId; + this.driverId = driverId; + this.sender = sender; + this.version = version; + } + + public OperatorTopologyStructImpl(final OperatorTopologyStruct topology) { + super(); + this.groupName = topology.getGroupName(); + this.operName = topology.getOperName(); + this.selfId = topology.getSelfId(); + this.driverId = topology.getDriverId(); + this.sender = topology.getSender(); + this.changes = topology.hasChanges(); + this.parent = topology.getParent(); + this.children.addAll(topology.getChildren()); + this.version = topology.getVersion(); + } + + @Override + public String toString() { + return "OperatorTopologyStruct - " + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + "(" + selfId + "," + version + ")"; + } + + @Override + public NodeStruct getParent() { + return parent; + } + + @Override + public Collection<? extends NodeStruct> getChildren() { + return children; + } + + @Override + public Class<? extends Name<String>> getGroupName() { + return groupName; + } + + @Override + public Class<? extends Name<String>> getOperName() { + return operName; + } + + @Override + public String getSelfId() { + return selfId; + } + + @Override + public String getDriverId() { + return driverId; + } + + @Override + public Sender getSender() { + return sender; + } + + @Override + public boolean hasChanges() { + LOG.entering("OperatorTopologyStructImpl", "hasChanges", getQualifiedName()); + LOG.exiting("OperatorTopologyStructImpl", "hasChanges", Arrays.toString(new Object[]{this.changes, getQualifiedName()})); + return this.changes; + } + + @Override + public int getVersion() { + return version; + } + + @Override + public void addAsData(final GroupCommunicationMessage msg) { + LOG.entering("OperatorTopologyStructImpl", "addAsData", new Object[]{getQualifiedName(), msg}); + final String srcId = msg.getSrcid(); + final NodeStruct node = findNode(srcId); + if (node != null) { + try { + nodesWithData.put(node); + LOG.finest(getQualifiedName() + "Added node " + srcId + " to nodesWithData queue"); + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while adding to childrenWithData queue", e); + } + node.addData(msg); + } else { + LOG.fine("Unable to find node " + srcId + " to send " + msg.getType() + " to"); + } + LOG.exiting("OperatorTopologyStructImpl", "addAsData", Arrays.toString(new Object[]{getQualifiedName(), msg})); + } + + private NodeStruct findNode(final String srcId) { + LOG.entering("OperatorTopologyStructImpl", "findNode", new Object[]{getQualifiedName(), srcId}); + final NodeStruct retVal; + if (parent != null && parent.getId().equals(srcId)) { + retVal = parent; + } else { + retVal = findChild(srcId); + } + LOG.exiting("OperatorTopologyStructImpl", "findNode", Arrays.toString(new Object[]{retVal, getQualifiedName(), srcId})); + return retVal; + } + + private void sendToNode(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, final NodeStruct node) { + LOG.entering("OperatorTopologyStructImpl", "sendToNode", new Object[]{getQualifiedName(), data, msgType, node}); + final String nodeId = node.getId(); + try { + + if (data.length > SMALL_MSG_LENGTH) { + LOG.finest(getQualifiedName() + "Msg too big. Sending readiness to send " + msgType + " msg to " + nodeId); + sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, selfId, version, nodeId, node.getVersion(), + Utils.EmptyByteArr)); + final byte[] tmpVal = receiveFromNode(node, true); + if (tmpVal != null) { + LOG.finest(getQualifiedName() + "Got readiness to accept " + msgType + " msg from " + nodeId + + ". Will send actual msg now"); + } else { + LOG.exiting("OperatorTopologyStructImpl", "sendToNode", Arrays.toString(new Object[]{getQualifiedName(), + data, msgType, node})); + return; + } + } + + sender.send(Utils.bldVersionedGCM(groupName, operName, msgType, selfId, version, nodeId, node.getVersion(), data)); + + if (data.length > SMALL_MSG_LENGTH) { + LOG.finest(getQualifiedName() + "Msg too big. Will wait for ACK before queing up one more msg"); + final byte[] tmpVal = receiveFromNode(node, true); + if (tmpVal != null) { + LOG.finest(getQualifiedName() + "Got " + msgType + " msg received ACK from " + nodeId + + ". Will move to next msg if it exists"); + } else { + LOG.exiting("OperatorTopologyStructImpl", "sendToNode", Arrays.toString(new Object[]{getQualifiedName(), + data, msgType, node})); + return; + } + } + } catch (final NetworkException e) { + throw new RuntimeException( + "NetworkException while sending " + msgType + " data from " + selfId + " to " + nodeId, + e); + } + LOG.exiting("OperatorTopologyStructImpl", "sendToNode", Arrays.toString(new Object[]{getQualifiedName(), data, + msgType, node})); + } + + /** + * @param childNode + * @return + */ + private byte[] receiveFromNode(final NodeStruct node, final boolean remove) { + LOG.entering("OperatorTopologyStructImpl", "receiveFromNode", new Object[]{getQualifiedName(), node, remove}); + final byte[] retVal = node.getData(); + if (remove) { + final boolean removed = nodesWithData.remove(node); + final String msg = getQualifiedName() + "Removed(" + removed + ") node " + node.getId() + + " from nodesWithData queue"; + if (removed) { + LOG.finest(msg); + } else { + LOG.fine(msg); + } + } + LOG.exiting("OperatorTopologyStructImpl", "receiveFromNode", Arrays.toString(new Object[]{retVal, getQualifiedName(), + node, remove})); + return retVal; + } + + @Override + public void sendToParent(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) { + LOG.entering("OperatorTopologyStructImpl", "sendToParent", new Object[]{getQualifiedName(), data, msgType}); + if (parent != null) { + sendToNode(data, msgType, parent); + } else { + LOG.fine(getQualifiedName() + "Perhaps parent has died or has not been configured"); + } + LOG.exiting("OperatorTopologyStructImpl", "sendToParent", Arrays.toString(new Object[]{getQualifiedName(), data, + msgType})); + } + + @Override + public void sendToChildren(final byte[] data, final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType) { + LOG.entering("OperatorTopologyStructImpl", "sendToChildren", new Object[]{getQualifiedName(), data, msgType}); + for (final NodeStruct child : children) { + sendToNode(data, msgType, child); + } + LOG.exiting("OperatorTopologyStructImpl", "sendToChildren", Arrays.toString(new Object[]{getQualifiedName(), + data, msgType})); + } + + @Override + public byte[] recvFromParent() { + LOG.entering("OperatorTopologyStructImpl", "recvFromParent", getQualifiedName()); + LOG.finest(getQualifiedName() + "Waiting for " + parent.getId() + " to send data"); + byte[] retVal = receiveFromNode(parent, true); + if (retVal != null && retVal.length == 0) { + LOG.finest(getQualifiedName() + "Got msg that parent " + parent.getId() + + " has large data and is ready to send data. Sending Ack to receive data"); + sendToNode(Utils.EmptyByteArr, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast, parent); + retVal = receiveFromNode(parent, true); + if (retVal != null) { + LOG.finest(getQualifiedName() + "Received large msg from Parent " + parent.getId() + + ". Will return it after ACKing it"); + sendToNode(Utils.EmptyByteArr, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast, parent); + } + } + LOG.exiting("OperatorTopologyStructImpl", "recvFromParent", + Arrays.toString(new Object[]{retVal, getQualifiedName()})); + return retVal; + } + + @Override + public <T> T recvFromChildren(final ReduceFunction<T> redFunc, final Codec<T> dataCodec) { + LOG.entering("OperatorTopologyStructImpl", "recvFromChildren", new Object[]{getQualifiedName(), redFunc, + dataCodec}); + final List<T> retLst = new ArrayList<>(2); + for (final NodeStruct child : children) { + childrenToRcvFrom.add(child.getId()); + } + + while (!childrenToRcvFrom.isEmpty()) { + LOG.finest(getQualifiedName() + "Waiting for some child to send data"); + NodeStruct child; + try { + child = nodesWithData.take(); + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while waiting to take data from nodesWithData queue", e); + } + byte[] retVal = receiveFromNode(child, false); + if (retVal != null && retVal.length == 0) { + LOG.finest(getQualifiedName() + "Got msg that child " + child.getId() + + " has large data and is ready to send data. Sending Ack to receive data"); + sendToNode(Utils.EmptyByteArr, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce, child); + retVal = receiveFromNode(child, true); + if (retVal != null) { + LOG.finest(getQualifiedName() + "Received large msg from child " + child.getId() + + ". Will reduce it after ACKing it"); + sendToNode(Utils.EmptyByteArr, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Reduce, child); + } else { + LOG.finest(getQualifiedName() + "Will not reduce it"); + } + } + if (retVal != null) { + retLst.add(dataCodec.decode(retVal)); + if (retLst.size() == 2) { + final T redVal = redFunc.apply(retLst); + retLst.clear(); + retLst.add(redVal); + } + } + childrenToRcvFrom.remove(child.getId()); + } + final T retVal = retLst.isEmpty() ? null : retLst.get(0); + LOG.exiting("OperatorTopologyStructImpl", "recvFromChildren", Arrays.toString(new Object[]{retVal, getQualifiedName(), + redFunc, dataCodec})); + return retVal; + } + + private boolean removedDeadMsg(final String msgSrcId, final int msgSrcVersion) { + LOG.entering("OperatorTopologyStructImpl", "removedDeadMsg", new Object[]{getQualifiedName(), msgSrcId, + msgSrcVersion}); + boolean retVal = false; + final Set<Integer> msgVersions = deadMsgs.get(msgSrcId); + if (msgVersions != null) { + LOG.fine(getQualifiedName() + "Found dead msgs " + msgVersions + " waiting for add"); + if (msgVersions.remove(msgSrcVersion)) { + LOG.fine(getQualifiedName() + "Found dead msg with same version as srcVer-" + msgSrcVersion); + retVal = true; + } else { + LOG.finest(getQualifiedName() + "No dead msg with same version as srcVer-" + msgSrcVersion); + } + } else { + LOG.finest(getQualifiedName() + "No dead msgs waiting for add."); + } + LOG.exiting("OperatorTopologyStructImpl", "removedDeadMsg", + Arrays.toString(new Object[]{retVal, getQualifiedName(), msgSrcId, msgSrcVersion})); + return retVal; + } + + private void addToDeadMsgs(final String srcId, final int version) { + LOG.entering("OperatorTopologyStructImpl", "addToDeadMsgs", new Object[]{getQualifiedName(), srcId, version}); + deadMsgs.putIfAbsent(srcId, new HashSet<Integer>()); + deadMsgs.get(srcId).add(version); + LOG.exiting("OperatorTopologyStructImpl", "addToDeadMsgs", Arrays.toString(new Object[]{getQualifiedName(), + srcId, version})); + } + + private boolean addedToDeadMsgs(final NodeStruct node, final String msgSrcId, final int msgSrcVersion) { + LOG.entering("OperatorTopologyStructImpl", "addedToDeadMsgs", new Object[]{getQualifiedName(), node, msgSrcId, + msgSrcVersion}); + if (node == null) { + LOG.warning(getQualifiedName() + "Got dead msg when no node existed. OOS Queing up for add to handle"); + addToDeadMsgs(msgSrcId, msgSrcVersion); + LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", Arrays.toString(new Object[]{true, getQualifiedName(), + node, msgSrcId, + msgSrcVersion})); + return true; + } + final int nodeVersion = node.getVersion(); + if (msgSrcVersion > nodeVersion) { + LOG.warning(getQualifiedName() + "Got an OOS dead msg. " + "Has HIGHER ver-" + msgSrcVersion + " than node ver-" + + nodeVersion + ". Queing up for add to handle"); + addToDeadMsgs(msgSrcId, msgSrcVersion); + LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", Arrays.toString(new Object[]{true, getQualifiedName(), + node, msgSrcId, + msgSrcVersion})); + return true; + } + LOG.exiting("OperatorTopologyStructImpl", "addedToDeadMsgs", Arrays.toString(new Object[]{false, getQualifiedName(), + node, msgSrcId, + msgSrcVersion})); + return false; + } + + /** + * Updates the topology structure with the received + * message. Does not make assumptions about msg order + * Tries to handle OOS msgs + * <p/> + * Expects only control messages + */ + @Override + public void update(final GroupCommunicationMessage msg) { + if (msg.hasSrcVersion()) { + final String srcId = msg.getSrcid(); + final int srcVersion = msg.getSrcVersion(); + LOG.finest(getQualifiedName() + "Updating " + msg.getType() + " msg from " + srcId); + LOG.finest(getQualifiedName() + "Before update: parent=" + ((parent != null) ? parent.getId() : "NULL")); + LOG.finest(getQualifiedName() + "Before update: children=" + children); + switch (msg.getType()) { + case ParentAdd: + updateParentAdd(srcId, srcVersion); + break; + case ParentDead: + updateParentDead(srcId, srcVersion); + break; + case ChildAdd: + updateChildAdd(srcId, srcVersion); + break; + case ChildDead: + updateChildDead(srcId, srcVersion); + break; + default: + throw new RuntimeException("Received a non control message in update"); + } + LOG.finest(getQualifiedName() + "After update: parent=" + ((parent != null) ? parent.getId() : "NULL")); + LOG.finest(getQualifiedName() + "After update: children=" + children); + } else { + throw new RuntimeException(getQualifiedName() + "can only deal with msgs that have src version set"); + } + } + + private void updateChildDead(final String srcId, final int srcVersion) { + LOG.entering("OperatorTopologyStructImpl", "updateChildDead", + new Object[]{getQualifiedName(), srcId, srcVersion}); + final NodeStruct toBeRemovedchild = findChild(srcId); + if (!addedToDeadMsgs(toBeRemovedchild, srcId, srcVersion)) { + final int childVersion = toBeRemovedchild.getVersion(); + if (srcVersion < childVersion) { + LOG.finest(getQualifiedName() + "Got an OOS child dead msg. " + "Has LOWER ver-" + srcVersion + + " than child ver-" + childVersion + ". Discarding"); + LOG.exiting("OperatorTopologyStructImpl", "updateChildDead", Arrays.toString(new Object[]{getQualifiedName(), + srcId, srcVersion})); + return; + } else { + LOG.finest(getQualifiedName() + "Got a child dead msg. " + "Has SAME ver-" + srcVersion + " as child ver-" + + childVersion + "Removing child node"); + } + } else { + LOG.fine(getQualifiedName() + "Added to dead msgs. Removing child node since ChildAdd might not turn up"); + } + children.remove(toBeRemovedchild); + LOG.exiting("OperatorTopologyStructImpl", "updateChildDead", Arrays.toString(new Object[]{getQualifiedName(), + srcId, srcVersion})); + } + + private void updateChildAdd(final String srcId, final int srcVersion) { + LOG.entering("OperatorTopologyStructImpl", "updateChildAdd", new Object[]{getQualifiedName(), srcId, srcVersion}); + if (!removedDeadMsg(srcId, srcVersion)) { + final NodeStruct toBeAddedchild = findChild(srcId); + if (toBeAddedchild != null) { + LOG.warning(getQualifiedName() + "Child already exists"); + final int childVersion = toBeAddedchild.getVersion(); + if (srcVersion < childVersion) { + LOG.fine(getQualifiedName() + "Got an OOS child add msg. " + "Has LOWER ver-" + srcVersion + + " than child ver-" + childVersion + ". Discarding"); + LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd", + Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion})); + return; + } + if (srcVersion > childVersion) { + LOG.fine(getQualifiedName() + "Got an OOS child add msg. " + "Has HIGHER ver-" + srcVersion + + " than child ver-" + childVersion + ". Bumping up version number"); + toBeAddedchild.setVersion(srcVersion); + LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd", + Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion})); + return; + } else { + throw new RuntimeException(getQualifiedName() + "Got two child add msgs of same version-" + srcVersion); + } + } else { + LOG.finest(getQualifiedName() + "Creating new child node for " + srcId); + children.add(new ChildNodeStruct(srcId, srcVersion)); + } + } else { + LOG.warning(getQualifiedName() + "Removed dead msg. Not adding child"); + } + LOG.exiting("OperatorTopologyStructImpl", "updateChildAdd", Arrays.toString(new Object[]{getQualifiedName(), + srcId, srcVersion})); + } + + private void updateParentDead(final String srcId, final int srcVersion) { + LOG.entering("OperatorTopologyStructImpl", "updateParentDead", + new Object[]{getQualifiedName(), srcId, srcVersion}); + if (!addedToDeadMsgs(parent, srcId, srcVersion)) { + final int parentVersion = parent.getVersion(); + if (srcVersion < parentVersion) { + LOG.fine(getQualifiedName() + "Got an OOS parent dead msg. " + "Has LOWER ver-" + srcVersion + + " than parent ver-" + parentVersion + ". Discarding"); + LOG.exiting("OperatorTopologyStructImpl", "updateParentDead", + Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion})); + return; + } else { + LOG.finest(getQualifiedName() + "Got a parent dead msg. " + "Has SAME ver-" + srcVersion + " as parent ver-" + + parentVersion + "Setting parent node to null"); + } + } else { + LOG.warning(getQualifiedName() + "Added to dead msgs. Setting parent to null since ParentAdd might not turn up"); + } + parent = null; + LOG.exiting("OperatorTopologyStructImpl", "updateParentDead", Arrays.toString(new Object[]{getQualifiedName(), + srcId, srcVersion})); + } + + private void updateParentAdd(final String srcId, final int srcVersion) { + LOG.entering("OperatorTopologyStructImpl", "updateParentAdd", + new Object[]{getQualifiedName(), srcId, srcVersion}); + if (!removedDeadMsg(srcId, srcVersion)) { + if (parent != null) { + LOG.fine(getQualifiedName() + "Parent already exists"); + final int parentVersion = parent.getVersion(); + if (srcVersion < parentVersion) { + LOG.fine(getQualifiedName() + "Got an OOS parent add msg. " + "Has LOWER ver-" + srcVersion + + " than parent ver-" + parentVersion + ". Discarding"); + LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd", + Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion})); + return; + } + if (srcVersion > parentVersion) { + LOG.fine(getQualifiedName() + "Got an OOS parent add msg. " + "Has HIGHER ver-" + srcVersion + + " than parent ver-" + parentVersion + ". Bumping up version number"); + parent.setVersion(srcVersion); + LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd", + Arrays.toString(new Object[]{getQualifiedName(), srcId, srcVersion})); + return; + } else { + throw new RuntimeException(getQualifiedName() + "Got two parent add msgs of same version-" + srcVersion); + } + } else { + LOG.finest(getQualifiedName() + "Creating new parent node for " + srcId); + parent = new ParentNodeStruct(srcId, srcVersion); + } + } else { + LOG.fine(getQualifiedName() + "Removed dead msg. Not adding parent"); + } + LOG.exiting("OperatorTopologyStructImpl", "updateParentAdd", Arrays.toString(new Object[]{getQualifiedName(), + srcId, srcVersion})); + } + + /** + * @param srcId + * @return + */ + private NodeStruct findChild(final String srcId) { + LOG.entering("OperatorTopologyStructImpl", "findChild", new Object[]{getQualifiedName(), srcId}); + NodeStruct retVal = null; + for (final NodeStruct node : children) { + if (node.getId().equals(srcId)) { + retVal = node; + break; + } + } + LOG.exiting("OperatorTopologyStructImpl", "findChild", Arrays.toString(new Object[]{retVal, getQualifiedName(), + srcId})); + return retVal; + } + + @Override + public void update(final Set<GroupCommunicationMessage> deletionDeltas) { + LOG.entering("OperatorTopologyStructImpl", "update", new Object[]{"Updating topology with deleting msgs", + getQualifiedName(), deletionDeltas}); + for (final GroupCommunicationMessage delDelta : deletionDeltas) { + update(delDelta); + } + LOG.exiting("OperatorTopologyStructImpl", "update", Arrays.toString(new Object[]{getQualifiedName(), + deletionDeltas})); + } + + @Override + public void setChanges(final boolean changes) { + LOG.entering("OperatorTopologyStructImpl", "setChanges", new Object[]{getQualifiedName(), changes}); + this.changes = changes; + LOG.exiting("OperatorTopologyStructImpl", "setChanges", + Arrays.toString(new Object[]{getQualifiedName(), changes})); + } + + private String getQualifiedName() { + return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + selfId + ":ver(" + version + ") - "; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ParentNodeStruct.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ParentNodeStruct.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ParentNodeStruct.java new file mode 100644 index 0000000..999f49d --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/ParentNodeStruct.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.task; + +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; + +import java.util.logging.Logger; + +/** + * + */ +public class ParentNodeStruct extends NodeStructImpl { + + private static final Logger LOG = Logger.getLogger(ParentNodeStruct.class.getName()); + + public ParentNodeStruct(final String id, final int version) { + super(id, version); + } + + @Override + public boolean checkDead(final GroupCommunicationMessage gcm) { + LOG.entering("ParentNodeStruct", "checkDead", gcm); + final boolean retVal = gcm.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead ? true : false; + LOG.exiting("ParentNodeStruct", "checkDead", gcm); + return retVal; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/BroadcastingEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/BroadcastingEventHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/BroadcastingEventHandler.java new file mode 100644 index 0000000..cdc46fc --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/BroadcastingEventHandler.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import org.apache.reef.wake.EventHandler; + +import java.util.ArrayList; +import java.util.List; + +/** + * + */ +public class BroadcastingEventHandler<T> implements EventHandler<T> { + + List<EventHandler<T>> handlers = new ArrayList<>(); + + public void addHandler(final EventHandler<T> handler) { + handlers.add(handler); + } + + @Override + public void onNext(final T msg) { + for (final EventHandler<T> handler : handlers) { + handler.onNext(msg); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java new file mode 100644 index 0000000..7dc651b --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ConcurrentCountingMap.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Utility map class that wraps a CountingMap + * in a ConcurrentMap + * Equivalent to Map<K,Map<V,Integer>> + */ +public class ConcurrentCountingMap<K, V> { + + private final ConcurrentMap<K, CountingMap<V>> map = new ConcurrentHashMap<>(); + + public boolean remove (final K key, final V value) { + if (!map.containsKey(key)) { + return false; + } + final boolean retVal = map.get(key).remove(value); + if (map.get(key).isEmpty()) { + map.remove(key); + } + return retVal; + } + + public void add (final K key, final V value) { + map.putIfAbsent(key, new CountingMap<V>()); + map.get(key).add(value); + } + + public CountingMap<V> get (final K key) { + return map.get(key); + } + + public boolean isEmpty () { + return map.isEmpty(); + } + + public boolean containsKey (final K key) { + return map.containsKey(key); + } + + public boolean contains (final K key, final V value) { + if (!map.containsKey(key)) { + return false; + } + return map.get(key).containsKey(value); + } + + public boolean notContains (final V value) { + for (final CountingMap<V> innerMap : map.values()) { + if (innerMap.containsKey(value)) { + return false; + } + } + return true; + } + + @Override + public String toString () { + return map.toString(); + } + + public void clear () { + for (final CountingMap<V> value : map.values()) { + value.clear(); + } + map.clear(); + } + + public static void main (final String[] args) { + final Logger LOG = Logger.getLogger(ConcurrentCountingMap.class.getName()); + final ConcurrentCountingMap<ReefNetworkGroupCommProtos.GroupCommMessage.Type, String> strMap = new ConcurrentCountingMap<>(); + LOG.log(Level.INFO, "OUT: {0}", strMap.isEmpty()); + strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, "ST0"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, "ST1"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST2"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST3"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.add(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST2"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST3"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + LOG.log(Level.INFO, "OUT: {0}", strMap.get(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd)); + LOG.log(Level.INFO, "OUT: {0}", strMap.get(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead)); + strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST1"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + LOG.log(Level.INFO, "OUT: {0}", strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd)); + LOG.log(Level.INFO, "OUT: {0}", strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead)); + LOG.log(Level.INFO, "OUT: {0}", strMap.contains(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0")); + LOG.log(Level.INFO, "OUT: {0}", strMap.contains(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST2")); + strMap.remove(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "ST0"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + LOG.log(Level.INFO, "OUT: {0}", strMap.containsKey(ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd)); + LOG.log(Level.INFO, "OUT: {0}", strMap.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingMap.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingMap.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingMap.java new file mode 100644 index 0000000..58c3f5c --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingMap.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Utility class to provide a map that allows to + * add multiple keys and automatically + * incrementing the count on each add + * decrementing the count on each remove + * and removing key on count==0 + */ +public class CountingMap<L> { + + private final Map<L, Integer> map = new HashMap<>(); + + public boolean containsKey (final L value) { + return map.containsKey(value); + } + + public int get (final L value) { + if (!containsKey(value)) { + return 0; + } + return map.get(value); + } + + public boolean isEmpty () { + return map.isEmpty(); + } + + public void clear () { + map.clear(); + } + + public void add (final L value) { + int cnt = (map.containsKey(value)) ? map.get(value) : 0; + map.put(value, ++cnt); + } + + public boolean remove (final L value) { + if (!map.containsKey(value)) { + return false; + } + int cnt = map.get(value); + --cnt; + if (cnt == 0) { + map.remove(value); + } else { + map.put(value, cnt); + } + return true; + } + + @Override + public String toString () { + return map.toString(); + } + + public static void main (final String[] args) { + final Logger LOG = Logger.getLogger(CountingMap.class.getName()); + final CountingMap<String> strMap = new CountingMap<>(); + strMap.add("Hello"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.add("World"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.add("Hello"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.add("Hello"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.add("World!"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.remove("Hello"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + strMap.remove("World"); + LOG.log(Level.INFO, "OUT: {0}", strMap); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingSemaphore.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingSemaphore.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingSemaphore.java new file mode 100644 index 0000000..3014667 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/CountingSemaphore.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +public class CountingSemaphore { + + private static final Logger LOG = Logger.getLogger(CountingSemaphore.class.getName()); + + private final AtomicInteger counter; + + private final String name; + + private final Object lock; + + private final int initCount; + + public CountingSemaphore (final int initCount, final String name, final Object lock) { + super(); + this.initCount = initCount; + this.name = name; + this.lock = lock; + this.counter = new AtomicInteger(initCount); + LOG.finest("Counter initialized to " + initCount); + } + + public int getInitialCount() { + return initCount; + } + + public int increment () { + synchronized (lock) { + final int retVal = counter.incrementAndGet(); + LOG.finest(name + "Incremented counter to " + retVal); + logStatus(); + return retVal; + } + } + + private void logStatus () { + final int yetToRun = counter.get(); + final int curRunning = initCount - yetToRun; + LOG.fine(name + curRunning + " workers are running & " + yetToRun + " workers are yet to run"); + } + + public int decrement () { + synchronized (lock) { + final int retVal = counter.decrementAndGet(); + LOG.finest(name + "Decremented counter to " + retVal); + if (retVal < 0) { + LOG.warning("Counter negative. More workers exist than you expected"); + } + if (retVal <= 0) { + LOG.finest(name + "All workers are done with their task. Notifying waiting threads"); + lock.notifyAll(); + } else { + LOG.finest(name + "Some workers are not done yet"); + } + logStatus(); + return retVal; + } + } + + public int get () { + synchronized (lock) { + return counter.get(); + } + } + + public void await () { + synchronized (lock) { + LOG.finest(name + "Waiting for workers to be done"); + while (counter.get() > 0) { + try { + lock.wait(); + LOG.finest(name + "Notified with counter=" + counter.get()); + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while waiting for counting semaphore counter", e); + } + } + LOG.finest(name + "Returning from wait"); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ResettingCountDownLatch.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ResettingCountDownLatch.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ResettingCountDownLatch.java new file mode 100644 index 0000000..30631c8 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/ResettingCountDownLatch.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import java.util.concurrent.CountDownLatch; + +public class ResettingCountDownLatch { + private CountDownLatch latch; + + public ResettingCountDownLatch (final int initialCount) { + latch = new CountDownLatch(initialCount); + } + + /** + * + */ + public void await () { + try { + latch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while waiting for latch", e); + } + } + + public void awaitAndReset (final int resetCount) { + try { + latch.await(); + latch = new CountDownLatch(resetCount); + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException while waiting for latch", e); + } + } + + /** + * + */ + public void countDown () { + latch.countDown(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/SetMap.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/SetMap.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/SetMap.java new file mode 100644 index 0000000..6a4b424 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/SetMap.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import org.apache.reef.io.network.group.impl.driver.MsgKey; + +import java.util.*; + +/** + * Map from K to Set<V> + */ +public class SetMap<K, V> { + private final Map<K, Set<V>> map = new HashMap<>(); + + public boolean containsKey(final K key) { + return map.containsKey(key); + } + + public boolean contains(final K key, final V value) { + if (!containsKey(key)) { + return false; + } + return map.get(key).contains(value); + } + + public Set<V> get(final K key) { + if (map.containsKey(key)) { + return map.get(key); + } else { + return Collections.emptySet(); + } + } + + public void add(final K key, final V value) { + final Set<V> values; + if (!map.containsKey(key)) { + values = new HashSet<>(); + map.put(key, values); + } else { + values = map.get(key); + } + values.add(value); + } + + public boolean remove(final K key, final V value) { + if (!map.containsKey(key)) { + return false; + } + final Set<V> set = map.get(key); + final boolean retVal = set.remove(value); + if (set.isEmpty()) { + map.remove(key); + } + return retVal; + } + + /** + * @param key + * @return + */ + public int count(final K key) { + if (!containsKey(key)) { + return 0; + } else { + return map.get(key).size(); + } + } + + /** + * @param key + */ + public Set<V> remove(final MsgKey key) { + return map.remove(key); + } + + public Set<K> keySet() { + return map.keySet(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java new file mode 100644 index 0000000..27811cb --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/utils/Utils.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.utils; + +import org.apache.reef.io.network.Message; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.tang.annotations.Name; + +import java.util.Iterator; + +/** + * + */ +public class Utils { + + public static final byte[] EmptyByteArr = new byte[0]; + + public static GroupCommunicationMessage bldVersionedGCM(final Class<? extends Name<String>> groupName, + final Class<? extends Name<String>> operName, + final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, final String from, final int srcVersion, + final String to, final int dstVersion, final byte[]... data) { + + return new GroupCommunicationMessage(groupName.getName(), operName.getName(), msgType, from, srcVersion, to, + dstVersion, data); + } + + public static Class<? extends Name<String>> getClass(final String className) { + try { + return (Class<? extends Name<String>>) Class.forName(className); + } catch (final ClassNotFoundException e) { + throw new RuntimeException("Unable to find class " + className, e); + } + } + + public static String simpleName(final Class<?> className) { + if (className != null) { + return className.getSimpleName(); + } else { + return "NULL"; + } + } + + public static byte[] getData(final GroupCommunicationMessage gcm) { + return (gcm.getMsgsCount() == 1) ? gcm.getData()[0] : null; + } + + /** + * @param msg + * @return + */ + public static GroupCommunicationMessage getGCM(final Message<GroupCommunicationMessage> msg) { + final Iterator<GroupCommunicationMessage> gcmIterator = msg.getData().iterator(); + if (gcmIterator.hasNext()) { + final GroupCommunicationMessage gcm = gcmIterator.next(); + if (gcmIterator.hasNext()) { + throw new RuntimeException("Expecting exactly one GCM object inside Message but found more"); + } + return gcm; + } else { + throw new RuntimeException("Expecting exactly one GCM object inside Message but found none"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/package-info.java new file mode 100644 index 0000000..4946182 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/package-info.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Elastic Group Communications for REEF. + * + * Provides MPI style Group Communication operators for collective communication + * between tasks. These should be primarily used for any form of + * task to task messaging along with the point to point communication + * provided by {@link org.apache.reef.io.network.impl.NetworkService} + * + * The interfaces for the operators are in org.apache.reef.io.network.group.api.operators + * The fluent way to describe these operators is available org.apache.reef.io.network.group.config + * The implementation of these operators are available in org.apache.reef.io.network.group.impl + * Currently only a basic implementation is available + */ +package org.apache.reef.io.network.group; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java index 5a4c765..0737286 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java @@ -20,29 +20,11 @@ package org.apache.reef.io.network.naming; import org.apache.reef.io.naming.NameAssignment; -import org.apache.reef.io.network.naming.serialization.*; -import org.apache.reef.tang.annotations.DefaultImplementation; -import org.apache.reef.tang.annotations.Parameter; -import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.Identifier; -import org.apache.reef.wake.IdentifierFactory; import org.apache.reef.wake.Stage; -import org.apache.reef.wake.impl.MultiEventHandler; -import org.apache.reef.wake.impl.SyncStage; -import org.apache.reef.wake.remote.Codec; -import org.apache.reef.wake.remote.NetUtils; -import org.apache.reef.wake.remote.impl.TransportEvent; -import org.apache.reef.wake.remote.transport.Transport; -import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; -import org.apache.reef.webserver.AvroReefServiceInfo; -import org.apache.reef.webserver.ReefEventStateManager; -import javax.inject.Inject; -import java.io.IOException; import java.net.InetSocketAddress; -import java.util.*; -import java.util.logging.Level; -import java.util.logging.Logger; +import java.util.List; /** * Naming server interface http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java index 4cb0cc6..ff241a0 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java @@ -24,7 +24,6 @@ import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.Identifier; import org.apache.reef.wake.IdentifierFactory; -import org.apache.reef.wake.Stage; import org.apache.reef.wake.impl.MultiEventHandler; import org.apache.reef.wake.impl.SyncStage; import org.apache.reef.wake.remote.Codec; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Utils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Utils.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Utils.java new file mode 100644 index 0000000..3e58f36 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/util/Utils.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.util; + +import com.google.protobuf.ByteString; +import org.apache.commons.lang.StringUtils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.wake.ComparableIdentifier; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; + +import java.net.Inet4Address; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class Utils { + + private static final String DELIMITER = "-"; + + /** + * TODO: Merge with parseListCmp() into one generic implementation. + */ + public static List<Identifier> parseList( + final String ids, final IdentifierFactory factory) { + final List<Identifier> result = new ArrayList<>(); + for (final String token : ids.split(DELIMITER)) { + result.add(factory.getNewInstance(token.trim())); + } + return result; + } + + /** + * TODO: Merge with parseList() into one generic implementation. + */ + public static List<ComparableIdentifier> parseListCmp( + final String ids, final IdentifierFactory factory) { + final List<ComparableIdentifier> result = new ArrayList<>(); + for (final String token : ids.split(DELIMITER)) { + result.add((ComparableIdentifier) factory.getNewInstance(token.trim())); + } + return result; + } + + public static String listToString(final List<ComparableIdentifier> ids) { + return StringUtils.join(ids, DELIMITER); + } + + public static List<Integer> createUniformCounts(final int elemSize, final int childSize) { + final int remainder = elemSize % childSize; + final int quotient = elemSize / childSize; + final ArrayList<Integer> result = new ArrayList<>(childSize); + result.addAll(Collections.nCopies(remainder, quotient + 1)); + result.addAll(Collections.nCopies(childSize - remainder, quotient)); + return Collections.unmodifiableList(result); + } + + private static class AddressComparator implements Comparator<Inet4Address> { + @Override + public int compare(final Inet4Address aa, final Inet4Address ba) { + final byte[] a = aa.getAddress(); + final byte[] b = ba.getAddress(); + // local subnet comes after all else. + if (a[0] == 127 && b[0] != 127) { + return 1; + } + if (a[0] != 127 && b[0] == 127) { + return -1; + } + for (int i = 0; i < 4; i++) { + if (a[i] < b[i]) { + return -1; + } + if (a[i] > b[i]) { + return 1; + } + } + return 0; + } + } + + public static ReefNetworkGroupCommProtos.GroupCommMessage bldGCM( + final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, + final Identifier from, final Identifier to, final byte[]... elements) { + + final ReefNetworkGroupCommProtos.GroupCommMessage.Builder GCMBuilder = + ReefNetworkGroupCommProtos.GroupCommMessage.newBuilder() + .setType(msgType) + .setSrcid(from.toString()) + .setDestid(to.toString()); + + final ReefNetworkGroupCommProtos.GroupMessageBody.Builder bodyBuilder = + ReefNetworkGroupCommProtos.GroupMessageBody.newBuilder(); + + for (final byte[] element : elements) { + bodyBuilder.setData(ByteString.copyFrom(element)); + GCMBuilder.addMsgs(bodyBuilder.build()); + } + + return GCMBuilder.build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java index 35a20e1..ffceb5d 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/storage/ram/RamMap.java @@ -33,12 +33,11 @@ import java.util.concurrent.ConcurrentSkipListMap; * default ExternalMap provided by StorageManagerRam. */ public class RamMap<T> implements ExternalMap<T> { - private final ConcurrentSkipListMap<CharSequence, T> map - = new ConcurrentSkipListMap<CharSequence, T>(); + + private final ConcurrentSkipListMap<CharSequence, T> map = new ConcurrentSkipListMap<CharSequence, T>(); @Inject public RamMap(RamStorageService ramStore) { - //this.localStore = localStore; } @Override @@ -70,5 +69,4 @@ public class RamMap<T> implements ExternalMap<T> { public Iterable<Entry<CharSequence, T>> getAll(Set<? extends CharSequence> keys) { return new GetAllIterable<>(keys, this); } - } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/proto/group_comm_protocol.proto ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/proto/group_comm_protocol.proto b/lang/java/reef-io/src/main/proto/group_comm_protocol.proto new file mode 100644 index 0000000..7e2f60f --- /dev/null +++ b/lang/java/reef-io/src/main/proto/group_comm_protocol.proto @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +option java_package = "org.apache.reef.io.network.proto"; +option java_outer_classname = "ReefNetworkGroupCommProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +message GroupCommMessage { + enum Type { + Scatter=1; + Gather=2; + Broadcast=3; + Reduce=4; + AllGather=5; + AllReduce=6; + ReduceScatter=7; + SourceDead=8; + SourceAdd=9; + ParentAdd=10; + ChildAdd=11; + ParentDead=12; + ChildDead=13; + ParentAdded=14; + ChildAdded=15; + ParentRemoved=16; + ChildRemoved=17; + TopologySetup=18; + UpdateTopology=19; + TopologyUpdated=20; + TopologyChanges=21; + } + + // identifies which field is filled in + required Type type = 1; + + required string srcid = 2; + required string destid = 3; + optional string groupname = 4; + optional string operatorname = 5; + optional int32 version = 6; + optional int32 srcVersion = 7; + repeated GroupMessageBody msgs = 8; +} + +message GroupMessageBody { + required bytes data = 1; +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/GroupCommunicationMessageCodecTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/GroupCommunicationMessageCodecTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/GroupCommunicationMessageCodecTest.java new file mode 100644 index 0000000..a83daa6 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/group/GroupCommunicationMessageCodecTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group; + +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessageCodec; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tang.exceptions.InjectionException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.util.Random; + +/** + * + */ +public class GroupCommunicationMessageCodecTest { + + @NamedParameter + class GroupName implements Name<String> { + } + + @NamedParameter + class OperName implements Name<String> { + } + + @Test(timeout = 100) + public final void testInstantiation() throws InjectionException { + final GroupCommunicationMessageCodec codec = Tang.Factory.getTang().newInjector().getInstance(GroupCommunicationMessageCodec.class); + Assert.assertNotNull("tang.getInstance(GroupCommunicationMessageCodec.class): ", codec); + } + + @Test(timeout = 100) + public final void testEncodeDecode() { + final Random r = new Random(); + final byte[] data = new byte[100]; + r.nextBytes(data); + final GroupCommunicationMessage expMsg = Utils.bldVersionedGCM(GroupName.class, OperName.class, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, "From", 0, "To", 1, data); + final GroupCommunicationMessageCodec codec = new GroupCommunicationMessageCodec(); + final GroupCommunicationMessage actMsg1 = codec.decode(codec.encode(expMsg)); + Assert.assertEquals("decode(encode(msg)): ", expMsg, actMsg1); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream daos = new DataOutputStream(baos); + codec.encodeToStream(expMsg, daos); + final GroupCommunicationMessage actMsg2 = codec.decodeFromStream(new DataInputStream(new ByteArrayInputStream(baos.toByteArray()))); + Assert.assertEquals("decodeFromStream(encodeToStream(msg)): ", expMsg, actMsg2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TestUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TestUtils.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TestUtils.java new file mode 100644 index 0000000..a41cf04 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TestUtils.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.util; + +import com.google.protobuf.ByteString; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.wake.Identifier; + +public class TestUtils { + public static ReefNetworkGroupCommProtos.GroupCommMessage bldGCM(final ReefNetworkGroupCommProtos.GroupCommMessage.Type msgType, final Identifier from, final Identifier to, final byte[]... elements) { + final ReefNetworkGroupCommProtos.GroupCommMessage.Builder GCMBuilder = ReefNetworkGroupCommProtos.GroupCommMessage.newBuilder(); + GCMBuilder.setType(msgType); + GCMBuilder.setSrcid(from.toString()); + GCMBuilder.setDestid(to.toString()); + final ReefNetworkGroupCommProtos.GroupMessageBody.Builder bodyBuilder = ReefNetworkGroupCommProtos.GroupMessageBody.newBuilder(); + for (final byte[] element : elements) { + bodyBuilder.setData(ByteString.copyFrom(element)); + GCMBuilder.addMsgs(bodyBuilder.build()); + } + final ReefNetworkGroupCommProtos.GroupCommMessage msg = GCMBuilder.build(); + return msg; + } + + /** + * @param type + * @return + */ + public static boolean controlMessage(final ReefNetworkGroupCommProtos.GroupCommMessage.Type type) { + + switch (type) { + case AllGather: + case AllReduce: + case Broadcast: + case Gather: + case Reduce: + case ReduceScatter: + case Scatter: + return false; + + default: + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java index 96cf7d4..17765f1 100644 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java @@ -28,8 +28,6 @@ import org.apache.reef.io.network.naming.NameServerImpl; import org.apache.reef.io.network.util.StringIdentifierFactory; import org.apache.reef.services.network.util.Monitor; import org.apache.reef.services.network.util.StringCodec; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.Identifier; import org.apache.reef.wake.IdentifierFactory; @@ -37,7 +35,7 @@ import org.apache.reef.wake.remote.NetUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.junit.Assert; + import java.net.InetSocketAddress; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -55,8 +53,6 @@ public class NetworkServiceTest { /** * NetworkService messaging test - * - * @throws Exception */ @Test public void testMessagingNetworkService() throws Exception { @@ -113,8 +109,6 @@ public class NetworkServiceTest { /** * NetworkService messaging rate benchmark - * - * @throws Exception */ @Test public void testMessagingNetworkServiceRate() throws Exception { @@ -187,8 +181,6 @@ public class NetworkServiceTest { /** * NetworkService messaging rate benchmark - * - * @throws Exception */ @Test public void testMessagingNetworkServiceRateDisjoint() throws Exception { @@ -372,8 +364,6 @@ public class NetworkServiceTest { /** * NetworkService messaging rate benchmark - * - * @throws Exception */ @Test public void testMessagingNetworkServiceBatchingRate() throws Exception { @@ -452,8 +442,6 @@ public class NetworkServiceTest { /** * Test message handler - * - * @param <T> type */ class MessageHandler<T> implements EventHandler<Message<T>> { @@ -470,13 +458,17 @@ public class NetworkServiceTest { @Override public void onNext(Message<T> value) { + count.incrementAndGet(); - //System.out.print(name + " received " + value.getData() + " from " + value.getSrcId() + " to " + value.getDestId()); - for (T obj : value.getData()) { - // System.out.print(" data: " + obj); + LOG.log(Level.FINEST, + "OUT: {0} received {1} from {2} to {3}", + new Object[] { name, value.getData(), value.getSrcId(), value.getDestId() }); + + for (final T obj : value.getData()) { + LOG.log(Level.FINEST, "OUT: data: {0}", obj); } - //LOG.log(Level.FINEST, ); + if (count.get() == expected) { monitor.mnotify(); }
