http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java new file mode 100644 index 0000000..7483483 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeImpl.java @@ -0,0 +1,476 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.driver; + +import org.apache.reef.io.network.group.api.driver.TaskNode; +import org.apache.reef.io.network.group.api.driver.TaskNodeStatus; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.wake.EStage; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +public class TaskNodeImpl implements TaskNode { + + private static final Logger LOG = Logger.getLogger(TaskNodeImpl.class.getName()); + + private final EStage<GroupCommunicationMessage> senderStage; + private final Class<? extends Name<String>> groupName; + private final Class<? extends Name<String>> operName; + private final String taskId; + private final String driverId; + + private final boolean isRoot; + private TaskNode parent; + private TaskNode sibling; + private final List<TaskNode> children = new ArrayList<>(); + + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicBoolean topoSetupSent = new AtomicBoolean(false); + + private final TaskNodeStatus taskNodeStatus; + + private final AtomicInteger version = new AtomicInteger(0); + + public TaskNodeImpl(final EStage<GroupCommunicationMessage> senderStage, + final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operatorName, + final String taskId, final String driverId, final boolean isRoot) { + this.senderStage = senderStage; + this.groupName = groupName; + this.operName = operatorName; + this.taskId = taskId; + this.driverId = driverId; + this.isRoot = isRoot; + taskNodeStatus = new TaskNodeStatusImpl(groupName, operatorName, taskId, this); + } + + @Override + public void setSibling(final TaskNode leaf) { + LOG.entering("TaskNodeImpl", "setSibling", new Object[]{getQualifiedName(), leaf}); + sibling = leaf; + LOG.exiting("TaskNodeImpl", "setSibling", getQualifiedName()); + } + + @Override + public int getNumberOfChildren() { + LOG.entering("TaskNodeImpl", "getNumberOfChildren", getQualifiedName()); + final int size = children.size(); + LOG.exiting("TaskNodeImpl", "getNumberOfChildren", getQualifiedName() + size); + return size; + } + + @Override + public TaskNode successor() { + LOG.entering("TaskNodeImpl", "successor", getQualifiedName()); + LOG.exiting("TaskNodeImpl", "successor", getQualifiedName() + sibling); + return sibling; + } + + @Override + public String toString() { + return "(" + taskId + "," + version.get() + ")"; + } + + /** + * * Methods pertaining to my status change *** + */ + @Override + public void onFailedTask() { + LOG.entering("TaskNodeImpl", "onFailedTask", getQualifiedName()); + if (!running.compareAndSet(true, false)) { + LOG.fine(getQualifiedName() + "Trying to set failed on an already failed task. Something fishy!!!"); + LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName() + "Trying to set failed on an already failed task. Something fishy!!!"); + return; + } + taskNodeStatus.clearStateAndReleaseLocks(); + LOG.finest(getQualifiedName() + "Changed status to failed."); + LOG.finest(getQualifiedName() + "Resetting topoSetupSent to false"); + topoSetupSent.set(false); + if (parent != null && parent.isRunning()) { + parent.onChildDead(taskId); + } else { + LOG.finest(getQualifiedName() + "Skipping asking parent to process child death"); + } + for (final TaskNode child : children) { + if (child.isRunning()) { + child.onParentDead(); + } + } + final int version = this.version.incrementAndGet(); + LOG.finest(getQualifiedName() + "Bumping up to version-" + version); + LOG.exiting("TaskNodeImpl", "onFailedTask", getQualifiedName()); + } + + @Override + public void onRunningTask() { + LOG.entering("TaskNodeImpl", "onRunningTask", getQualifiedName()); + if (!running.compareAndSet(false, true)) { + LOG.fine(getQualifiedName() + "Trying to set running on an already running task. Something fishy!!!"); + LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName() + "Trying to set running on an already running task. Something fishy!!!"); + return; + } + final int version = this.version.get(); + LOG.finest(getQualifiedName() + "Changed status to running version-" + version); + if (parent != null && parent.isRunning()) { + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parent.getTaskId(), + parent.getVersion(), taskId, + version, Utils.EmptyByteArr); + taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); + senderStage.onNext(gcm); + parent.onChildRunning(taskId); + } else { + LOG.finest(getQualifiedName() + "Skipping src add to & for parent"); + } + for (final TaskNode child : children) { + if (child.isRunning()) { + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, child.getTaskId(), + child.getVersion(), taskId, version, + Utils.EmptyByteArr); + taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); + senderStage.onNext(gcm); + child.onParentRunning(); + } + } + LOG.exiting("TaskNodeImpl", "onRunningTask", getQualifiedName()); + } + + /** + * * Methods pertaining to my status change ends *** + */ + + @Override + public void onParentRunning() { + LOG.entering("TaskNodeImpl", "onParentRunning", getQualifiedName()); + if (parent != null && parent.isRunning()) { + final int parentVersion = parent.getVersion(); + final String parentTaskId = parent.getTaskId(); + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentAdd, parentTaskId, + parentVersion, taskId, version.get(), + Utils.EmptyByteArr); + taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); + senderStage.onNext(gcm); + } else { + LOG.finer(getQualifiedName() + "Parent was running when I was asked to add him." + + " However, he is not active anymore. Returning without sending ParentAdd" + " msg. ***CHECK***"); + } + LOG.exiting("TaskNodeImpl", "onParentRunning", getQualifiedName()); + } + + @Override + public void onParentDead() { + LOG.entering("TaskNodeImpl", "onParentDead", getQualifiedName()); + if (parent != null) { + final int parentVersion = parent.getVersion(); + final String parentTaskId = parent.getTaskId(); + taskNodeStatus.updateFailureOf(parent.getTaskId()); + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ParentDead, parentTaskId, + parentVersion, taskId, version.get(), + Utils.EmptyByteArr); + taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); + senderStage.onNext(gcm); + } else { + throw new RuntimeException(getQualifiedName() + "Don't expect parent to be null. Something wrong"); + } + LOG.exiting("TaskNodeImpl", "onParentDead", getQualifiedName()); + } + + @Override + public void onChildRunning(final String childId) { + LOG.entering("TaskNodeImpl", "onChildRunning", new Object[]{getQualifiedName(), childId}); + final TaskNode childTask = findTask(childId); + if (childTask != null && childTask.isRunning()) { + final int childVersion = childTask.getVersion(); + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildAdd, childId, + childVersion, taskId, version.get(), + Utils.EmptyByteArr); + taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); + senderStage.onNext(gcm); + } else { + LOG.fine(getQualifiedName() + childId + " was running when I was asked to add him." + + " However, I can't find a task corresponding to him now." + + " Returning without sending ChildAdd msg. ***CHECK***"); + } + LOG.exiting("TaskNodeImpl", "onChildRunning", getQualifiedName() + childId); + } + + @Override + public void onChildDead(final String childId) { + LOG.entering("TaskNodeImpl", "onChildDead", new Object[]{getQualifiedName(), childId}); + final TaskNode childTask = findChildTask(childId); + if (childTask != null) { + final int childVersion = childTask.getVersion(); + taskNodeStatus.updateFailureOf(childId); + final GroupCommunicationMessage gcm = Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.ChildDead, childId, + childVersion, taskId, version.get(), + Utils.EmptyByteArr); + taskNodeStatus.expectAckFor(gcm.getType(), gcm.getSrcid()); + senderStage.onNext(gcm); + } else { + throw new RuntimeException(getQualifiedName() + "Don't expect task for " + childId + " to be null. Something wrong"); + } + LOG.exiting("TaskNodeImpl", "onChildDead", getQualifiedName() + childId); + } + + /** + * * Methods pertaining to my neighbors status change ends *** + */ + + @Override + public void onReceiptOfAcknowledgement(final GroupCommunicationMessage msg) { + LOG.entering("TaskNodeImpl", "onReceiptOfAcknowledgement", new Object[]{getQualifiedName(), msg}); + taskNodeStatus.processAcknowledgement(msg); + LOG.exiting("TaskNodeImpl", "onReceiptOfAcknowledgement", getQualifiedName() + msg); + } + + @Override + public void updatingTopology() { + LOG.entering("TaskNodeImpl", "updatingTopology", getQualifiedName()); + taskNodeStatus.updatingTopology(); + LOG.exiting("TaskNodeImpl", "updatingTopology", getQualifiedName()); + } + + @Override + public String getTaskId() { + return taskId; + } + + @Override + public void addChild(final TaskNode child) { + LOG.entering("TaskNodeImpl", "addChild", new Object[]{getQualifiedName(), child.getTaskId()}); + children.add(child); + LOG.exiting("TaskNodeImpl", "addChild", getQualifiedName() + child); + } + + @Override + public void removeChild(final TaskNode child) { + LOG.entering("TaskNodeImpl", "removeChild", new Object[]{getQualifiedName(), child.getTaskId()}); + children.remove(child); + LOG.exiting("TaskNodeImpl", "removeChild", getQualifiedName() + child); + } + + @Override + public void setParent(final TaskNode parent) { + LOG.entering("TaskNodeImpl", "setParent", new Object[]{getQualifiedName(), parent}); + this.parent = parent; + LOG.exiting("TaskNodeImpl", "setParent", getQualifiedName() + parent); + } + + @Override + public boolean isRunning() { + LOG.entering("TaskNodeImpl", "isRunning", getQualifiedName()); + final boolean b = running.get(); + LOG.exiting("TaskNodeImpl", "isRunning", getQualifiedName() + b); + return b; + } + + @Override + public TaskNode getParent() { + LOG.entering("TaskNodeImpl", "getParent", getQualifiedName()); + LOG.exiting("TaskNodeImpl", "getParent", getQualifiedName() + parent); + return parent; + } + + private String getQualifiedName() { + return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," + getVersion() + ") - "; + } + + @Override + public boolean isNeighborActive(final String neighborId) { + LOG.entering("TaskNodeImpl", "isNeighborActive", new Object[]{getQualifiedName(), neighborId}); + final boolean active = taskNodeStatus.isActive(neighborId); + LOG.exiting("TaskNodeImpl", "isNeighborActive", getQualifiedName() + active); + return active; + } + + @Override + public boolean resetTopologySetupSent() { + LOG.entering("TaskNodeImpl", "resetTopologySetupSent", new Object[]{getQualifiedName(),}); + final boolean retVal = topoSetupSent.compareAndSet(true, false); + LOG.exiting("TaskNodeImpl", "resetTopologySetupSent", getQualifiedName() + retVal); + return retVal; + } + + @Override + public void checkAndSendTopologySetupMessage() { + LOG.entering("TaskNodeImpl", "checkAndSendTopologySetupMessage", getQualifiedName()); + if (!topoSetupSent.get() + && (parentActive() && activeNeighborOfParent()) + && (allChildrenActive() && activeNeighborOfAllChildren())) { + sendTopoSetupMsg(); + } + LOG.exiting("TaskNodeImpl", "checkAndSendTopologySetupMessage", getQualifiedName()); + } + + private void sendTopoSetupMsg() { + LOG.entering("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName() + taskId); + LOG.fine(getQualifiedName() + "is an active participant in the topology"); + senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologySetup, driverId, 0, taskId, + version.get(), Utils.EmptyByteArr)); + taskNodeStatus.onTopologySetupMessageSent(); + final boolean sentAlready = !topoSetupSent.compareAndSet(false, true); + if (sentAlready) { + LOG.fine(getQualifiedName() + "TopologySetup msg was sent more than once. Something fishy!!!"); + } + LOG.exiting("TaskNodeImpl", "sendTopoSetupMsg", getQualifiedName()); + } + + @Override + public void checkAndSendTopologySetupMessageFor(final String source) { + LOG.entering("TaskNodeImpl", "checkAndSendTopologySetupMessageFor", new Object[]{getQualifiedName(), source}); + final TaskNode srcNode = findTask(source); + if (srcNode != null) { + srcNode.checkAndSendTopologySetupMessage(); + } + LOG.exiting("TaskNodeImpl", "checkAndSendTopologySetupMessageFor", getQualifiedName() + source); + } + + /** + * @param sourceId + * @return + */ + private TaskNode findTask(final String sourceId) { + LOG.entering("TaskNodeImpl", "findTask", new Object[]{getQualifiedName(), sourceId}); + final TaskNode retNode; + if (parent != null && parent.getTaskId().equals(sourceId)) { + retNode = parent; + } else { + retNode = findChildTask(sourceId); + } + LOG.exiting("TaskNodeImpl", "findTask", getQualifiedName() + retNode); + return retNode; + } + + private TaskNode findChildTask(final String sourceId) { + LOG.entering("TaskNodeImpl", "findChildTask", new Object[]{getQualifiedName(), sourceId}); + TaskNode retNode = null; + for (final TaskNode child : children) { + if (child.getTaskId().equals(sourceId)) { + retNode = child; + break; + } + } + LOG.exiting("TaskNodeImpl", "findChildTask", getQualifiedName() + retNode); + return retNode; + } + + private boolean parentActive() { + LOG.entering("TaskNodeImpl", "parentActive", getQualifiedName()); + if (isRoot) { + LOG.exiting("TaskNodeImpl", "parentActive", Arrays.toString(new Object[]{true, getQualifiedName(), "I am root. Will never have parent. So signalling active"})); + return true; + } + if (isNeighborActive(parent.getTaskId())) { + LOG.exiting("TaskNodeImpl", "parentActive", Arrays.toString(new Object[]{true, getQualifiedName(), parent, " is an active neghbor"})); + return true; + } + LOG.exiting("TaskNodeImpl", "parentActive", getQualifiedName() + "Neither root Nor is " + parent + " an active neghbor"); + return false; + } + + private boolean activeNeighborOfParent() { + LOG.entering("TaskNodeImpl", "activeNeighborOfParent", getQualifiedName()); + if (isRoot) { + LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), "I am root. Will never have parent. So signalling active"})); + return true; + } + if (parent.isNeighborActive(taskId)) { + LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{true, getQualifiedName(), "I am an active neighbor of parent ", parent})); + return true; + } + LOG.exiting("TaskNodeImpl", "activeNeighborOfParent", Arrays.toString(new Object[]{false, getQualifiedName(), "Neither is parent null Nor am I an active neighbor of parent ", parent})); + return false; + } + + private boolean allChildrenActive() { + LOG.entering("TaskNodeImpl", "allChildrenActive", getQualifiedName()); + for (final TaskNode child : children) { + final String childId = child.getTaskId(); + if (child.isRunning() && !isNeighborActive(childId)) { + LOG.exiting("TaskNodeImpl", "allChildrenActive", Arrays.toString(new Object[]{false, getQualifiedName(), childId, " not active yet"})); + return false; + } + } + LOG.exiting("TaskNodeImpl", "allChildrenActive", Arrays.toString(new Object[]{true, getQualifiedName(), "All children active"})); + return true; + } + + private boolean activeNeighborOfAllChildren() { + LOG.entering("TaskNodeImpl", "activeNeighborOfAllChildren", getQualifiedName()); + for (final TaskNode child : children) { + if (child.isRunning() && !child.isNeighborActive(taskId)) { + LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", Arrays.toString(new Object[]{false, getQualifiedName(), "Not an active neighbor of child ", child})); + return false; + } + } + LOG.exiting("TaskNodeImpl", "activeNeighborOfAllChildren", Arrays.toString(new Object[]{true, getQualifiedName(), "Active neighbor of all children"})); + return true; + } + + @Override + public void waitForTopologySetupOrFailure() { + LOG.entering("TaskNodeImpl", "waitForTopologySetupOrFailure", getQualifiedName()); + taskNodeStatus.waitForTopologySetup(); + LOG.exiting("TaskNodeImpl", "waitForTopologySetupOrFailure", getQualifiedName()); + } + + @Override + public boolean hasChanges() { + LOG.entering("TaskNodeImpl", "hasChanges", getQualifiedName()); + final boolean changes = taskNodeStatus.hasChanges(); + LOG.exiting("TaskNodeImpl", "hasChanges", getQualifiedName() + changes); + return changes; + } + + @Override + public int getVersion() { + return version.get(); + } + + @Override + public int hashCode() { + int r = taskId.hashCode(); + r = 31 * r + version.get(); + return r; + } + + @Override + public boolean equals(final Object obj) { + if (obj != this) { + if (obj instanceof TaskNodeImpl) { + final TaskNodeImpl that = (TaskNodeImpl) obj; + if (this.taskId.equals(that.taskId) && this.version.get() == that.version.get()) { + return true; + } else { + return false; + } + } else { + return false; + } + } else { + return true; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java new file mode 100644 index 0000000..3f904e2 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskNodeStatusImpl.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.driver; + +import org.apache.reef.io.network.group.api.driver.TaskNode; +import org.apache.reef.io.network.group.api.driver.TaskNodeStatus; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.utils.ConcurrentCountingMap; +import org.apache.reef.io.network.group.impl.utils.CountingMap; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos.GroupCommMessage.Type; +import org.apache.reef.tang.annotations.Name; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +public class TaskNodeStatusImpl implements TaskNodeStatus { + + private static final Logger LOG = Logger.getLogger(TaskNodeStatusImpl.class.getName()); + + private final ConcurrentCountingMap<Type, String> statusMap = new ConcurrentCountingMap<>(); + private final Class<? extends Name<String>> groupName; + private final Class<? extends Name<String>> operName; + private final String taskId; + private final Set<String> activeNeighbors = new HashSet<>(); + private final CountingMap<String> neighborStatus = new CountingMap<>(); + private final AtomicBoolean updatingTopo = new AtomicBoolean(false); + private final Object topoUpdateStageLock = new Object(); + private final Object topoSetupSentLock = new Object(); + private final TaskNode node; + + public TaskNodeStatusImpl(final Class<? extends Name<String>> groupName, + final Class<? extends Name<String>> operName, final String taskId, final TaskNode node) { + this.groupName = groupName; + this.operName = operName; + this.taskId = taskId; + this.node = node; + } + + private boolean isDeadMsg(final Type msgAcked) { + return msgAcked == Type.ParentDead || msgAcked == Type.ChildDead; + } + + private boolean isAddMsg(final Type msgAcked) { + return msgAcked == Type.ParentAdd || msgAcked == Type.ChildAdd; + } + + private Type getAckedMsg(final Type msgType) { + switch (msgType) { + case ParentAdded: + return Type.ParentAdd; + case ChildAdded: + return Type.ChildAdd; + case ParentRemoved: + return Type.ParentDead; + case ChildRemoved: + return Type.ChildDead; + default: + return msgType; + } + } + + private void chkIamActiveToSendTopoSetup(final Type msgDealt) { + LOG.entering("TaskNodeStatusImpl", "chkAndSendTopoSetup", new Object[]{getQualifiedName(), msgDealt}); + if (statusMap.isEmpty()) { + LOG.finest(getQualifiedName() + "Empty status map."); + node.checkAndSendTopologySetupMessage(); + } else { + LOG.finest(getQualifiedName() + "Status map non-empty" + statusMap); + } + LOG.exiting("TaskNodeStatusImpl", "chkAndSendTopoSetup", getQualifiedName() + msgDealt); + } + + @Override + public void onTopologySetupMessageSent() { + LOG.entering("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName()); + neighborStatus.clear(); + synchronized (topoSetupSentLock) { + topoSetupSentLock.notifyAll(); + } + LOG.exiting("TaskNodeStatusImpl", "onTopologySetupMessageSent", getQualifiedName()); + } + + @Override + public boolean isActive(final String neighborId) { + LOG.entering("TaskNodeStatusImpl", "isActive", new Object[]{getQualifiedName(), neighborId}); + final boolean contains = activeNeighbors.contains(neighborId); + LOG.exiting("TaskNodeStatusImpl", "isActive", getQualifiedName() + contains); + return contains; + } + + /** + * This needs to happen in line rather than in a stage because we need to note + * the messages we send to the tasks before we start processing msgs from the + * nodes.(Acks & Topology msgs) + */ + @Override + public void expectAckFor(final Type msgType, final String srcId) { + LOG.entering("TaskNodeStatusImpl", "expectAckFor", new Object[]{getQualifiedName(), msgType, srcId}); + LOG.finest(getQualifiedName() + "Adding " + srcId + " to sources"); + statusMap.add(msgType, srcId); + LOG.exiting("TaskNodeStatusImpl", "expectAckFor", getQualifiedName() + "Sources from which ACKs for " + msgType + " are expected: " + statusMap.get(msgType)); + } + + @Override + public void clearStateAndReleaseLocks() { + LOG.entering("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName()); + statusMap.clear(); + activeNeighbors.clear(); + neighborStatus.clear(); + updatingTopo.compareAndSet(true, false); + synchronized (topoSetupSentLock) { + topoSetupSentLock.notifyAll(); + } + synchronized (topoUpdateStageLock) { + topoUpdateStageLock.notifyAll(); + } + LOG.exiting("TaskNodeStatusImpl", "clearStateAndReleaseLocks", getQualifiedName()); + } + + @Override + public void updateFailureOf(final String taskId) { + LOG.entering("TaskNodeStatusImpl", "updateFailureOf", new Object[]{getQualifiedName(), taskId}); + activeNeighbors.remove(taskId); + neighborStatus.remove(taskId); + LOG.exiting("TaskNodeStatusImpl", "updateFailureOf", getQualifiedName()); + } + + @Override + public void processAcknowledgement(final GroupCommunicationMessage gcm) { + LOG.entering("TaskNodeStatusImpl", "processMsg", new Object[]{getQualifiedName(), gcm}); + final String self = gcm.getSrcid(); + final Type msgType = gcm.getType(); + final Type msgAcked = getAckedMsg(msgType); + final String sourceId = gcm.getDestid(); + switch (msgType) { + case TopologySetup: + synchronized (topoUpdateStageLock) { + if (!updatingTopo.compareAndSet(true, false)) { + LOG.fine(getQualifiedName() + "Was expecting updateTopo to be true but it was false"); + } + topoUpdateStageLock.notifyAll(); + } + break; + case ParentAdded: + case ChildAdded: + case ParentRemoved: + case ChildRemoved: + processNeighborAcks(gcm, msgType, msgAcked, sourceId); + break; + + default: + LOG.fine(getQualifiedName() + "Non ACK msg " + gcm.getType() + " for " + gcm.getDestid() + " unexpected"); + break; + } + LOG.exiting("TaskNodeStatusImpl", "processMsg", getQualifiedName()); + } + + private void processNeighborAcks(final GroupCommunicationMessage gcm, final Type msgType, final Type msgAcked, + final String sourceId) { + LOG.entering("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + gcm); + if (statusMap.containsKey(msgAcked)) { + if (statusMap.contains(msgAcked, sourceId)) { + statusMap.remove(msgAcked, sourceId); + updateNeighborStatus(msgAcked, sourceId); + checkNeighborActiveToSendTopoSetup(sourceId); + chkIamActiveToSendTopoSetup(msgAcked); + } else { + LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage Got " + msgType + " from a source(" + sourceId + + ") to whom ChildAdd was not sent. " + + "Perhaps reset during failure. If context not indicative use ***CAUTION***"); + } + } else { + LOG.fine(getQualifiedName() + "NodeStatusMsgProcessorStage There were no " + msgAcked + + " msgs sent in the previous update cycle. " + + "Perhaps reset during failure. If context not indicative use ***CAUTION***"); + } + LOG.exiting("TaskNodeStatusImpl", "processNeighborAcks", getQualifiedName() + gcm); + } + + private void checkNeighborActiveToSendTopoSetup(final String sourceId) { + LOG.entering("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", new Object[]{getQualifiedName(), + sourceId}); + if (statusMap.notContains(sourceId)) { + //All msgs corresponding to sourceId have been ACKed + if (neighborStatus.get(sourceId) > 0) { + activeNeighbors.add(sourceId); + node.checkAndSendTopologySetupMessageFor(sourceId); + } else { + LOG.finest(getQualifiedName() + sourceId + " is not a neighbor anymore"); + } + } else { + LOG.finest(getQualifiedName() + "Not done processing " + sourceId + " acks yet. So it is still inactive"); + } + LOG.exiting("TaskNodeStatusImpl", "checkNeighborActiveToSendTopoSetup", getQualifiedName() + sourceId); + } + + private void updateNeighborStatus(final Type msgAcked, final String sourceId) { + LOG.entering("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), msgAcked, sourceId}); + if (isAddMsg(msgAcked)) { + neighborStatus.add(sourceId); + } else if (isDeadMsg(msgAcked)) { + neighborStatus.remove(sourceId); + } else { + throw new RuntimeException("Can only deal with Neigbor ACKs while I received " + msgAcked + " from " + sourceId); + } + LOG.exiting("TaskNodeStatusImpl", "updateNeighborStatus", new Object[]{getQualifiedName(), msgAcked, sourceId}); + } + + @Override + public void updatingTopology() { + LOG.entering("TaskNodeStatusImpl", "updatingTopology", getQualifiedName()); + final boolean topoBeingUpdated = !updatingTopo.compareAndSet(false, true); + if (topoBeingUpdated) { + throw new RuntimeException(getQualifiedName() + "Was expecting updateTopo to be false but it was true"); + } + LOG.exiting("TaskNodeStatusImpl", "updatingTopology", getQualifiedName()); + } + + private String getQualifiedName() { + return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":(" + taskId + "," + node.getVersion() + ") - "; + } + + @Override + public boolean hasChanges() { + LOG.entering("TaskNodeStatusImpl", "hasChanges", getQualifiedName()); + final boolean notEmpty = !statusMap.isEmpty(); + LOG.exiting("TaskNodeStatusImpl", "hasChanges", getQualifiedName() + notEmpty); + return notEmpty; + } + + @Override + public void waitForTopologySetup() { + LOG.entering("TaskNodeStatusImpl", "waitForTopologySetup", getQualifiedName()); + LOG.finest("Waiting to acquire topoUpdateStageLock"); + synchronized (topoUpdateStageLock) { + LOG.finest(getQualifiedName() + "Acquired topoUpdateStageLock. updatingTopo: " + updatingTopo.get()); + while (updatingTopo.get() && node.isRunning()) { + try { + LOG.finest(getQualifiedName() + "Waiting on topoUpdateStageLock"); + topoUpdateStageLock.wait(); + } catch (final InterruptedException e) { + throw new RuntimeException("InterruptedException in NodeTopologyUpdateWaitStage " + + "while waiting for receiving TopologySetup", e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java new file mode 100644 index 0000000..8404d89 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TaskState.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.driver; + +public enum TaskState { + NOT_STARTED, RUNNING, FAILED; +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java new file mode 100644 index 0000000..228cf4b --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedEvaluatorHandler.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.driver; + +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.wake.EventHandler; + +import java.util.logging.Logger; + +public class TopologyFailedEvaluatorHandler implements EventHandler<FailedEvaluator> { + + private static final Logger LOG = Logger.getLogger(TopologyFailedEvaluatorHandler.class.getName()); + + + private final CommunicationGroupDriverImpl communicationGroupDriverImpl; + + public TopologyFailedEvaluatorHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) { + this.communicationGroupDriverImpl = communicationGroupDriverImpl; + } + + @Override + public void onNext(final FailedEvaluator failedEvaluator) { + final String failedEvaluatorId = failedEvaluator.getId(); + LOG.entering("TopologyFailedEvaluatorHandler", "onNext", failedEvaluatorId); + if (failedEvaluator.getFailedTask().isPresent()) { + final String failedTaskId = failedEvaluator.getFailedTask().get().getId(); + LOG.finest("Failed Evaluator contains a failed task: " + failedTaskId); + communicationGroupDriverImpl.failTask(failedTaskId); + communicationGroupDriverImpl.removeTask(failedTaskId); + } + LOG.exiting("TopologyFailedEvaluatorHandler", "onNext", failedEvaluatorId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java new file mode 100644 index 0000000..a7ce7f7 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyFailedTaskHandler.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.driver; + +import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.wake.EventHandler; + +import java.util.logging.Logger; + +public class TopologyFailedTaskHandler implements EventHandler<FailedTask> { + + private static final Logger LOG = Logger.getLogger(TopologyFailedTaskHandler.class.getName()); + + + private final CommunicationGroupDriverImpl communicationGroupDriverImpl; + + public TopologyFailedTaskHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) { + this.communicationGroupDriverImpl = communicationGroupDriverImpl; + } + + @Override + public void onNext(final FailedTask failedTask) { + final String failedTaskId = failedTask.getId(); + LOG.entering("TopologyFailedTaskHandler", "onNext", failedTaskId); + communicationGroupDriverImpl.failTask(failedTaskId); + LOG.exiting("TopologyFailedTaskHandler", "onNext", failedTaskId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java new file mode 100644 index 0000000..e651aa7 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyMessageHandler.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.driver; + +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.wake.EventHandler; + +import java.util.logging.Logger; + +public class TopologyMessageHandler implements EventHandler<GroupCommunicationMessage> { + + private static final Logger LOG = Logger.getLogger(TopologyMessageHandler.class.getName()); + + + private final CommunicationGroupDriverImpl communicationGroupDriverImpl; + + public TopologyMessageHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) { + this.communicationGroupDriverImpl = communicationGroupDriverImpl; + } + + @Override + public void onNext(final GroupCommunicationMessage msg) { + LOG.entering("TopologyMessageHandler", "onNext", msg); + communicationGroupDriverImpl.processMsg(msg); + LOG.exiting("TopologyMessageHandler", "onNext", msg); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java new file mode 100644 index 0000000..d6e0fae --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyRunningTaskHandler.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.driver; + +import org.apache.reef.driver.task.RunningTask; +import org.apache.reef.wake.EventHandler; + +import java.util.logging.Logger; + +public class TopologyRunningTaskHandler implements EventHandler<RunningTask> { + + private static final Logger LOG = Logger.getLogger(TopologyRunningTaskHandler.class.getName()); + + private final CommunicationGroupDriverImpl communicationGroupDriverImpl; + + public TopologyRunningTaskHandler(final CommunicationGroupDriverImpl communicationGroupDriverImpl) { + this.communicationGroupDriverImpl = communicationGroupDriverImpl; + } + + @Override + public void onNext(final RunningTask runningTask) { + final String runningTaskId = runningTask.getId(); + LOG.entering("TopologyRunningTaskHandler", "onNext", runningTaskId); + communicationGroupDriverImpl.runTask(runningTaskId); + LOG.exiting("TopologyRunningTaskHandler", "onNext", runningTaskId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java new file mode 100644 index 0000000..3905d47 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TopologyUpdateWaitHandler.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.driver; + +import org.apache.reef.io.network.group.api.driver.TaskNode; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.EventHandler; + +import java.util.List; +import java.util.logging.Logger; + +/** + * + */ +public class TopologyUpdateWaitHandler implements EventHandler<List<TaskNode>> { + + private static final Logger LOG = Logger.getLogger(TopologyUpdateWaitHandler.class.getName()); + private final EStage<GroupCommunicationMessage> senderStage; + private final Class<? extends Name<String>> groupName; + private final Class<? extends Name<String>> operName; + private final String driverId; + private final int driverVersion; + private final String dstId; + private final int dstVersion; + private final String qualifiedName; + + + /** + * The handler will wait for all nodes to acquire topoLock + * and send TopologySetup msg. Then it will send TopologyUpdated + * msg. However, any local topology changes are not in effect + * till driver sends TopologySetup once statusMap is emptied + * The operations in the tasks that have topology changes will + * wait for this. However other tasks that do not have any changes + * will continue their regular operation + */ + public TopologyUpdateWaitHandler(final EStage<GroupCommunicationMessage> senderStage, + final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operName, + final String driverId, final int driverVersion, final String dstId, final int dstVersion, + final String qualifiedName) { + super(); + this.senderStage = senderStage; + this.groupName = groupName; + this.operName = operName; + this.driverId = driverId; + this.driverVersion = driverVersion; + this.dstId = dstId; + this.dstVersion = dstVersion; + this.qualifiedName = qualifiedName; + } + + + @Override + public void onNext(final List<TaskNode> nodes) { + LOG.entering("TopologyUpdateWaitHandler", "onNext", new Object[]{qualifiedName, nodes}); + + for (final TaskNode node : nodes) { + LOG.fine(qualifiedName + "Waiting for " + node + " to enter TopologyUdate phase"); + node.waitForTopologySetupOrFailure(); + if (node.isRunning()) { + LOG.fine(qualifiedName + node + " is in TopologyUpdate phase"); + } else { + LOG.fine(qualifiedName + node + " has failed"); + } + } + LOG.finest(qualifiedName + "NodeTopologyUpdateWaitStage All to be updated nodes " + "have received TopologySetup"); + LOG.fine(qualifiedName + "All affected parts of the topology are in TopologyUpdate phase. Will send a note to (" + + dstId + "," + dstVersion + ")"); + senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated, driverId, driverVersion, dstId, + dstVersion, Utils.EmptyByteArr)); + LOG.exiting("TopologyUpdateWaitHandler", "onNext", qualifiedName); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java new file mode 100644 index 0000000..b1695a9 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/TreeTopology.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.driver; + +import org.apache.reef.io.network.group.api.operators.GroupCommOperator; +import org.apache.reef.io.network.group.api.GroupChanges; +import org.apache.reef.io.network.group.api.config.OperatorSpec; +import org.apache.reef.io.network.group.api.driver.TaskNode; +import org.apache.reef.io.network.group.api.driver.Topology; +import org.apache.reef.io.network.group.impl.GroupChangesCodec; +import org.apache.reef.io.network.group.impl.GroupChangesImpl; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec; +import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec; +import org.apache.reef.io.network.group.impl.config.parameters.DataCodec; +import org.apache.reef.io.network.group.impl.config.parameters.ReduceFunctionParam; +import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion; +import org.apache.reef.io.network.group.impl.operators.BroadcastReceiver; +import org.apache.reef.io.network.group.impl.operators.BroadcastSender; +import org.apache.reef.io.network.group.impl.operators.ReduceReceiver; +import org.apache.reef.io.network.group.impl.operators.ReduceSender; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.JavaConfigurationBuilder; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.formats.AvroConfigurationSerializer; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.impl.SingleThreadStage; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.logging.Logger; + +/** + * Implements a tree topology with the specified Fan Out + */ +public class TreeTopology implements Topology { + + private static final Logger LOG = Logger.getLogger(TreeTopology.class.getName()); + + private final EStage<GroupCommunicationMessage> senderStage; + private final Class<? extends Name<String>> groupName; + private final Class<? extends Name<String>> operName; + private final String driverId; + private String rootId; + private OperatorSpec operatorSpec; + + private TaskNode root; + private TaskNode logicalRoot; + private TaskNode prev; + private final int fanOut; + + private final ConcurrentMap<String, TaskNode> nodes = new ConcurrentSkipListMap<>(); + private final ConfigurationSerializer confSer = new AvroConfigurationSerializer(); + + + public TreeTopology(final EStage<GroupCommunicationMessage> senderStage, + final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operatorName, + final String driverId, final int numberOfTasks, final int fanOut) { + this.senderStage = senderStage; + this.groupName = groupName; + this.operName = operatorName; + this.driverId = driverId; + this.fanOut = fanOut; + LOG.config(getQualifiedName() + "Tree Topology running with a fan-out of " + fanOut); + } + + @Override + public void setRootTask(final String rootId) { + LOG.entering("TreeTopology", "setRootTask", new Object[]{getQualifiedName(), rootId}); + this.rootId = rootId; + LOG.exiting("TreeTopology", "setRootTask", getQualifiedName() + rootId); + } + + @Override + public String getRootId() { + LOG.entering("TreeTopology", "getRootId", getQualifiedName()); + LOG.exiting("TreeTopology", "getRootId", getQualifiedName() + rootId); + return rootId; + } + + @Override + public void setOperatorSpecification(final OperatorSpec spec) { + LOG.entering("TreeTopology", "setOperSpec", new Object[]{getQualifiedName(), spec}); + this.operatorSpec = spec; + LOG.exiting("TreeTopology", "setOperSpec", getQualifiedName() + spec); + } + + @Override + public Configuration getTaskConfiguration(final String taskId) { + LOG.entering("TreeTopology", "getTaskConfig", new Object[]{getQualifiedName(), taskId}); + final TaskNode taskNode = nodes.get(taskId); + if (taskNode == null) { + throw new RuntimeException(getQualifiedName() + taskId + " does not exist"); + } + + final int version = getNodeVersion(taskId); + final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder(); + jcb.bindNamedParameter(DataCodec.class, operatorSpec.getDataCodecClass()); + jcb.bindNamedParameter(TaskVersion.class, Integer.toString(version)); + if (operatorSpec instanceof BroadcastOperatorSpec) { + final BroadcastOperatorSpec broadcastOperatorSpec = (BroadcastOperatorSpec) operatorSpec; + if (taskId.equals(broadcastOperatorSpec.getSenderId())) { + jcb.bindImplementation(GroupCommOperator.class, BroadcastSender.class); + } else { + jcb.bindImplementation(GroupCommOperator.class, BroadcastReceiver.class); + } + } else if (operatorSpec instanceof ReduceOperatorSpec) { + final ReduceOperatorSpec reduceOperatorSpec = (ReduceOperatorSpec) operatorSpec; + jcb.bindNamedParameter(ReduceFunctionParam.class, reduceOperatorSpec.getRedFuncClass()); + if (taskId.equals(reduceOperatorSpec.getReceiverId())) { + jcb.bindImplementation(GroupCommOperator.class, ReduceReceiver.class); + } else { + jcb.bindImplementation(GroupCommOperator.class, ReduceSender.class); + } + } + final Configuration retConf = jcb.build(); + LOG.exiting("TreeTopology", "getTaskConfig", getQualifiedName() + confSer.toString(retConf)); + return retConf; + } + + @Override + public int getNodeVersion(final String taskId) { + LOG.entering("TreeTopology", "getNodeVersion", new Object[]{getQualifiedName(), taskId}); + final TaskNode node = nodes.get(taskId); + if (node == null) { + throw new RuntimeException(getQualifiedName() + taskId + " is not available on the nodes map"); + } + final int version = node.getVersion(); + LOG.exiting("TreeTopology", "getNodeVersion", getQualifiedName() + " " + taskId + " " + version); + return version; + } + + @Override + public void removeTask(final String taskId) { + LOG.entering("TreeTopology", "removeTask", new Object[]{getQualifiedName(), taskId}); + if (!nodes.containsKey(taskId)) { + LOG.fine("Trying to remove a non-existent node in the task graph"); + LOG.exiting("TreeTopology", "removeTask", getQualifiedName()); + return; + } + if (taskId.equals(rootId)) { + unsetRootNode(taskId); + } else { + removeChild(taskId); + } + LOG.exiting("TreeTopology", "removeTask", getQualifiedName() + taskId); + } + + @Override + public void addTask(final String taskId) { + LOG.entering("TreeTopology", "addTask", new Object[]{getQualifiedName(), taskId}); + if (nodes.containsKey(taskId)) { + LOG.fine("Got a request to add a task that is already in the graph. " + + "We need to block this request till the delete finishes. ***CAUTION***"); + } + + if (taskId.equals(rootId)) { + setRootNode(taskId); + } else { + addChild(taskId); + } + prev = nodes.get(taskId); + LOG.exiting("TreeTopology", "addTask", getQualifiedName() + taskId); + } + + private void addChild(final String taskId) { + LOG.entering("TreeTopology", "addChild", new Object[]{getQualifiedName(), taskId}); + LOG.finest(getQualifiedName() + "Adding leaf " + taskId); + final TaskNode node = new TaskNodeImpl(senderStage, groupName, operName, taskId, driverId, false); + if (logicalRoot != null) { + addTaskNode(node); + } + nodes.put(taskId, node); + LOG.exiting("TreeTopology", "addChild", getQualifiedName() + taskId); + } + + private void addTaskNode(final TaskNode node) { + LOG.entering("TreeTopology", "addTaskNode", new Object[]{getQualifiedName(), node}); + if (logicalRoot.getNumberOfChildren() >= this.fanOut) { + logicalRoot = logicalRoot.successor(); + } + node.setParent(logicalRoot); + logicalRoot.addChild(node); + prev.setSibling(node); + LOG.exiting("TreeTopology", "addTaskNode", getQualifiedName() + node); + } + + private void removeChild(final String taskId) { + LOG.entering("TreeTopology", "removeChild", new Object[]{getQualifiedName(), taskId}); + if (root != null) { + root.removeChild(nodes.get(taskId)); + } + nodes.remove(taskId); + LOG.exiting("TreeTopology", "removeChild", getQualifiedName() + taskId); + } + + private void setRootNode(final String rootId) { + LOG.entering("TreeTopology", "setRootNode", new Object[]{getQualifiedName(), rootId}); + final TaskNode node = new TaskNodeImpl(senderStage, groupName, operName, rootId, driverId, true); + this.root = node; + this.logicalRoot = this.root; + this.prev = this.root; + + for (final Map.Entry<String, TaskNode> nodeEntry : nodes.entrySet()) { + final TaskNode leaf = nodeEntry.getValue(); + addTaskNode(leaf); + this.prev = leaf; + } + nodes.put(rootId, root); + LOG.exiting("TreeTopology", "setRootNode", getQualifiedName() + rootId); + } + + private void unsetRootNode(final String taskId) { + LOG.entering("TreeTopology", "unsetRootNode", new Object[]{getQualifiedName(), taskId}); + nodes.remove(rootId); + + for (final Map.Entry<String, TaskNode> nodeEntry : nodes.entrySet()) { + final String id = nodeEntry.getKey(); + final TaskNode leaf = nodeEntry.getValue(); + leaf.setParent(null); + } + LOG.exiting("TreeTopology", "unsetRootNode", getQualifiedName() + taskId); + } + + @Override + public void onFailedTask(final String taskId) { + LOG.entering("TreeTopology", "onFailedTask", new Object[]{getQualifiedName(), taskId}); + final TaskNode taskNode = nodes.get(taskId); + if (taskNode == null) { + throw new RuntimeException(getQualifiedName() + taskId + " does not exist"); + } + taskNode.onFailedTask(); + LOG.exiting("TreeTopology", "onFailedTask", getQualifiedName() + taskId); + } + + @Override + public void onRunningTask(final String taskId) { + LOG.entering("TreeTopology", "onRunningTask", new Object[]{getQualifiedName(), taskId}); + final TaskNode taskNode = nodes.get(taskId); + if (taskNode == null) { + throw new RuntimeException(getQualifiedName() + taskId + " does not exist"); + } + taskNode.onRunningTask(); + LOG.exiting("TreeTopology", "onRunningTask", getQualifiedName() + taskId); + } + + @Override + public void onReceiptOfMessage(final GroupCommunicationMessage msg) { + LOG.entering("TreeTopology", "onReceiptOfMessage", new Object[]{getQualifiedName(), msg}); + switch (msg.getType()) { + case TopologyChanges: + onTopologyChanges(msg); + break; + case UpdateTopology: + onUpdateTopology(msg); + break; + + default: + nodes.get(msg.getSrcid()).onReceiptOfAcknowledgement(msg); + break; + } + LOG.exiting("TreeTopology", "onReceiptOfMessage", getQualifiedName() + msg); + } + + private void onUpdateTopology(final GroupCommunicationMessage msg) { + LOG.entering("TreeTopology", "onUpdateTopology", new Object[]{getQualifiedName(), msg}); + LOG.fine(getQualifiedName() + "Update affected parts of Topology"); + final String dstId = msg.getSrcid(); + final int version = getNodeVersion(dstId); + + LOG.finest(getQualifiedName() + "Creating NodeTopologyUpdateWaitStage to wait on nodes to be updated"); + final EventHandler<List<TaskNode>> topoUpdateWaitHandler = new TopologyUpdateWaitHandler(senderStage, groupName, + operName, driverId, 0, + dstId, version, + getQualifiedName()); + final EStage<List<TaskNode>> nodeTopologyUpdateWaitStage = new SingleThreadStage<>("NodeTopologyUpdateWaitStage", + topoUpdateWaitHandler, + nodes.size()); + + final List<TaskNode> toBeUpdatedNodes = new ArrayList<>(nodes.size()); + LOG.finest(getQualifiedName() + "Checking which nodes need to be updated"); + for (final TaskNode node : nodes.values()) { + if (node.isRunning() && node.hasChanges() && node.resetTopologySetupSent()) { + toBeUpdatedNodes.add(node); + } + } + for (final TaskNode node : toBeUpdatedNodes) { + node.updatingTopology(); + LOG.fine(getQualifiedName() + "Asking " + node + " to UpdateTopology"); + senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.UpdateTopology, driverId, 0, node.getTaskId(), + node.getVersion(), Utils.EmptyByteArr)); + } + nodeTopologyUpdateWaitStage.onNext(toBeUpdatedNodes); + LOG.exiting("TreeTopology", "onUpdateTopology", getQualifiedName() + msg); + } + + private void onTopologyChanges(final GroupCommunicationMessage msg) { + LOG.entering("TreeTopology", "onTopologyChanges", new Object[]{getQualifiedName(), msg}); + LOG.fine(getQualifiedName() + "Check TopologyChanges"); + final String dstId = msg.getSrcid(); + boolean hasTopologyChanged = false; + LOG.finest(getQualifiedName() + "Checking which nodes need to be updated"); + for (final TaskNode node : nodes.values()) { + if (!node.isRunning() || node.hasChanges()) { + hasTopologyChanged = true; + break; + } + } + final GroupChanges changes = new GroupChangesImpl(hasTopologyChanged); + final Codec<GroupChanges> changesCodec = new GroupChangesCodec(); + LOG.fine(getQualifiedName() + "TopologyChanges: " + changes); + senderStage.onNext(Utils.bldVersionedGCM(groupName, operName, ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges, driverId, 0, dstId, getNodeVersion(dstId), + changesCodec.encode(changes))); + LOG.exiting("TreeTopology", "onTopologyChanges", getQualifiedName() + msg); + } + + private String getQualifiedName() { + return Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + " - "; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java new file mode 100644 index 0000000..82424ce --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/package-info.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * This package contains the implementation of the driver side of the + * Group Communication Service using the tree/flat topology. The Service + * can be configured with many named Communication Groups and each + * Communication Group can be configured with many named operators. This + * configuration is typically done on the GroupCommDriver object injected + * into the driver. During this specification only the root nodes are + * specified. + * + * After the GroupCommDriver is configured the driver would want to submit + * tasks with the Communication Groups & their operators configured. To do + * that the user has to create a partial task configuration containing his + * set of configurations and add it to the relevant communication group. + * Based on whether the given task is a Master/Slave different roles for + * the operators are configured. Once added, the final Configuration containing + * operators and their roles encoded can be obtained by the + * CommunicationGroupDriver.getConfiguration() call. The topology is complete + * once the minimum number of tasks needed for the group to function have been + * added. + * + * The initial configuration dished out by the service + * creates a bunch of tasks that are not connected. Connections are + * established when the Tasks start running. Each operator defines its own + * topology and can have potentially different root nodes. Each node in the + * topology called a TaskNode is a logical representation of a running Task. + * Adding a task to a group creates its TaskNode with TaskState NOT_STARTED. + * + * The driver side of the service plays a major role in setting up the + * topology and making sure that topology is set-up only when the parties + * involved are ready to participate in the communication. A topology will + * not contain parties that are not active. Active status is given to parties + * who have acknowledged the presence of their neighbors and who have been + * acknowledged by their neighbors as present. The connection between two + * parties is initiated by the driver and the driver then expects the parties + * to ACK that their end of the connection has been set-up. Once a party has + * ACK its end of all connections and all its neighbors ACK the outgoing part + * of their connection to this party the driver sends a TopologySetup msg to + * indicate that the topology is usable by this party now. The driver also + * listens in on failure events and appropriately updates its state so that + * it does not wait for ACKs from failed tasks. + * + * There are two chains of control: + * 1. Driver Events (Running/Failed Tasks) + * 2. Tasks (Msgs sent by Task side of Group Communication Service) + * + * All events and msgs are funneled through the CommunicationGroupDriver so + * that all the topologies belonging to different operators configured on the + * group are in sync. Without this there is possibility of a deadlock between + * the tasks and the driver. So there is no finer level locking other than that + * in the CommunicationGroupDriver. + * + * 1. Driver Events + * These are routed to all communication groups and each communication group + * routes it to all topologies. The topology will then route this event to the + * corresponding TaskNode which will process that event. When a task starts + * running it is notified of its running neighbors and the running neighbors + * are notified of its running. The TaskNodeStatus object keeps track of the + * state of the local topology for this TaskNode. What msgs have been sent to + * this node that need to be ACKed, the status of its neighbors and whether + * this TaskNode is ready to accept data from a neighboring TaskNode when we + * ask the neighbor to check if he was only waiting for this TaskNode to ACK + * in order to send TopologySetup. So when we are sending (Parent|Child)(Add|Dead) + * msgs we first note that we expect an ACK back for this. These ACK expectations + * are then deleted if the node fails. Neighbor failures are also updated. + * All the msg sending is done by the TaskNode. The TaskNodeStatus is only a + * state manager to consult on the status of ACKs and neighbors. This is needed + * by the chkAndSendTopSetup logic. These events also send msgs related to failure + * of tasks so that any task in the toplogy that waited for a response from the + * failed task can move on. + * + * 2. Tasks + * We get ACK msgs from tasks and they update the status of ACK expectations. + * Here the TaskNodeStatus acts as a bridge between the initiation of a link + * between two parties and the final set-up of the link. Once all ACKs have + * been received we ask the TaskNode to check if it is ready to send a + * TopologySetup msg. Every ACK can also trigger the chkAndSendTopSetup for + * a neighbor. + * + * The above concerns the topology set-up and fault notiifcations. However, the + * other major task that the driver helps with is in updating a topology. When + * requested for the update of a Topology using the UpdateTopology msg, the + * driver notifies all the parties that have to update their topologies by + * sending an UpdateTopology msg to the affected parties. The tasks then try to + * enter the UpdateToplogy phase and as soon as they can do(by acquiring a lock) + * they respond that they have done so. The driver will wait till all the affected + * parties do so and then sends the initiator a msg that Topology has been updated. + * The effect of this is that the unaffected regions of the topology can continue + * to work normally while the affected regions are healing. The affected regions + * heal their (local)topologies using the TopologySetup mechanism described above. + * Both update topology and inital set-up use the minimum number of tasks to define + * when the set-up is complete. + * + * The CommGroupDriver class also takes care of minor alterations to event ordering + * by the use of locks to coerce the events to occur in a definite way. + * + */ +package org.apache.reef.io.network.group.impl.driver; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java new file mode 100644 index 0000000..b8dd425 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastReceiver.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.operators; + +import org.apache.reef.driver.parameters.DriverIdentifier; +import org.apache.reef.driver.task.TaskConfigurationOptions; +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.exception.ParentDeadException; +import org.apache.reef.io.network.group.api.operators.Broadcast; +import org.apache.reef.io.network.impl.NetworkService; +import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler; +import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; +import org.apache.reef.io.network.group.api.task.OperatorTopology; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName; +import org.apache.reef.io.network.group.impl.config.parameters.DataCodec; +import org.apache.reef.io.network.group.impl.config.parameters.OperatorName; +import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion; +import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +public class BroadcastReceiver<T> implements Broadcast.Receiver<T>, EventHandler<GroupCommunicationMessage> { + + private static final Logger LOG = Logger.getLogger(BroadcastReceiver.class.getName()); + + private final Class<? extends Name<String>> groupName; + private final Class<? extends Name<String>> operName; + private final CommGroupNetworkHandler commGroupNetworkHandler; + private final Codec<T> dataCodec; + private final NetworkService<GroupCommunicationMessage> netService; + private final Sender sender; + + private final OperatorTopology topology; + + private final AtomicBoolean init = new AtomicBoolean(false); + + private final CommunicationGroupServiceClient commGroupClient; + + private final int version; + + @Inject + public BroadcastReceiver(@Parameter(CommunicationGroupName.class) final String groupName, + @Parameter(OperatorName.class) final String operName, + @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId, + @Parameter(DataCodec.class) final Codec<T> dataCodec, + @Parameter(DriverIdentifier.class) final String driverId, + @Parameter(TaskVersion.class) final int version, + final CommGroupNetworkHandler commGroupNetworkHandler, + final NetworkService<GroupCommunicationMessage> netService, + final CommunicationGroupServiceClient commGroupClient) { + super(); + this.version = version; + LOG.finest(operName + " has CommGroupHandler-" + commGroupNetworkHandler.toString()); + this.groupName = Utils.getClass(groupName); + this.operName = Utils.getClass(operName); + this.dataCodec = dataCodec; + this.commGroupNetworkHandler = commGroupNetworkHandler; + this.netService = netService; + this.sender = new Sender(this.netService); + this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version); + this.commGroupNetworkHandler.register(this.operName, this); + this.commGroupClient = commGroupClient; + } + + @Override + public int getVersion() { + return version; + } + + @Override + public void initialize() throws ParentDeadException { + topology.initialize(); + } + + @Override + public Class<? extends Name<String>> getOperName() { + return operName; + } + + @Override + public Class<? extends Name<String>> getGroupName() { + return groupName; + } + + @Override + public String toString() { + return "BroadcastReceiver:" + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version; + } + + @Override + public void onNext(final GroupCommunicationMessage msg) { + topology.handle(msg); + } + + @Override + public T receive() throws NetworkException, InterruptedException { + LOG.entering("BroadcastReceiver", "receive", this); + LOG.fine("I am " + this); + + if (init.compareAndSet(false, true)) { + LOG.fine(this + " Communication group initializing"); + commGroupClient.initialize(); + LOG.fine(this + " Communication group initialized"); + } + // I am an intermediate node or leaf. + + final T retVal; + // Wait for parent to send + LOG.fine(this + " Waiting to receive broadcast"); + byte[] data; + try { + data = topology.recvFromParent(); + // TODO: Should receive the identity element instead of null + if (data == null) { + LOG.fine(this + " Received null. Perhaps one of my ancestors is dead."); + retVal = null; + } else { + LOG.finest("Using " + dataCodec.getClass().getSimpleName() + " as codec"); + retVal = dataCodec.decode(data); + LOG.finest("Decoded msg successfully"); + LOG.fine(this + " Received: " + retVal); + LOG.finest(this + " Sending to children."); + } + + topology.sendToChildren(data, ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast); + } catch (final ParentDeadException e) { + throw new RuntimeException("ParentDeadException", e); + } + LOG.exiting("BroadcastReceiver", "receive", Arrays.toString(new Object[]{retVal, this})); + return retVal; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6c6ad336/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java new file mode 100644 index 0000000..6f146e6 --- /dev/null +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/operators/BroadcastSender.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.group.impl.operators; + +import org.apache.reef.driver.parameters.DriverIdentifier; +import org.apache.reef.driver.task.TaskConfigurationOptions; +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.exception.ParentDeadException; +import org.apache.reef.io.network.group.api.operators.Broadcast; +import org.apache.reef.io.network.impl.NetworkService; +import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler; +import org.apache.reef.io.network.group.api.task.CommunicationGroupServiceClient; +import org.apache.reef.io.network.group.api.task.OperatorTopology; +import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; +import org.apache.reef.io.network.group.impl.config.parameters.CommunicationGroupName; +import org.apache.reef.io.network.group.impl.config.parameters.DataCodec; +import org.apache.reef.io.network.group.impl.config.parameters.OperatorName; +import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion; +import org.apache.reef.io.network.group.impl.task.OperatorTopologyImpl; +import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; +import org.apache.reef.io.serialization.Codec; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +public class BroadcastSender<T> implements Broadcast.Sender<T>, EventHandler<GroupCommunicationMessage> { + + private static final Logger LOG = Logger.getLogger(BroadcastSender.class.getName()); + + private final Class<? extends Name<String>> groupName; + private final Class<? extends Name<String>> operName; + private final CommGroupNetworkHandler commGroupNetworkHandler; + private final Codec<T> dataCodec; + private final NetworkService<GroupCommunicationMessage> netService; + private final Sender sender; + + private final OperatorTopology topology; + + private final AtomicBoolean init = new AtomicBoolean(false); + + private final CommunicationGroupServiceClient commGroupClient; + + private final int version; + + @Inject + public BroadcastSender(@Parameter(CommunicationGroupName.class) final String groupName, + @Parameter(OperatorName.class) final String operName, + @Parameter(TaskConfigurationOptions.Identifier.class) final String selfId, + @Parameter(DataCodec.class) final Codec<T> dataCodec, + @Parameter(DriverIdentifier.class) final String driverId, + @Parameter(TaskVersion.class) final int version, + final CommGroupNetworkHandler commGroupNetworkHandler, + final NetworkService<GroupCommunicationMessage> netService, + final CommunicationGroupServiceClient commGroupClient) { + super(); + this.version = version; + LOG.finest(operName + "has CommGroupHandler-" + commGroupNetworkHandler.toString()); + this.groupName = Utils.getClass(groupName); + this.operName = Utils.getClass(operName); + this.dataCodec = dataCodec; + this.commGroupNetworkHandler = commGroupNetworkHandler; + this.netService = netService; + this.sender = new Sender(this.netService); + this.topology = new OperatorTopologyImpl(this.groupName, this.operName, selfId, driverId, sender, version); + this.commGroupNetworkHandler.register(this.operName, this); + this.commGroupClient = commGroupClient; + } + + @Override + public int getVersion() { + return version; + } + + @Override + public void initialize() throws ParentDeadException { + topology.initialize(); + } + + @Override + public Class<? extends Name<String>> getOperName() { + return operName; + } + + @Override + public Class<? extends Name<String>> getGroupName() { + return groupName; + } + + @Override + public String toString() { + return "BroadcastSender:" + Utils.simpleName(groupName) + ":" + Utils.simpleName(operName) + ":" + version; + } + + @Override + public void onNext(final GroupCommunicationMessage msg) { + topology.handle(msg); + } + + @Override + public void send(final T element) throws NetworkException, InterruptedException { + LOG.entering("BroadcastSender", "send", new Object[]{this, element}); + LOG.fine("I am " + this); + + if (init.compareAndSet(false, true)) { + LOG.fine(this + " Communication group initializing"); + commGroupClient.initialize(); + LOG.fine(this + " Communication group initialized"); + } + + try { + LOG.fine(this + " Broadcasting " + element); + topology.sendToChildren(dataCodec.encode(element), ReefNetworkGroupCommProtos.GroupCommMessage.Type.Broadcast); + } catch (final ParentDeadException e) { + throw new RuntimeException("ParentDeadException", e); + } + LOG.exiting("BroadcastSender", "send", Arrays.toString(new Object[]{this, element})); + } + +}
