Additional changes for getting Myriad HA to work * Myriad Executor + NM (merged) now sends TASK_RUNNING and TASK_FINISHED messages to mesos for Mesos tasks corresponding to yarn containers. This is independent of the RM. * Entire ExecutorInfo object for NM tasks is being preserved and recovered from the state store. This is being done because mesos requires all tasks run on the same executor to have the same executor info objects. The Myriad Executor + NM (merged) also runs tasks corresponding to yarn containers. These tasks also need to be provided the same ExecutorInfo object. This ExecutorInfo object cannot be obtained across an RM restart without being preserved into the state store. Made code changes to store ExecutorInfo into the scheduler state and serialize and deserialize it to the state store. * Made sure that the RM's view of NM capacity is updated correctly after an RM restart. RM's view is not regenerated atomically, so assumptions about data being available are not always true. Fixed a few NullPointerExceptions here.
Testing done * Run a job with one node using Course Grain Scaling(CGS) and one flexed up node using Fine Grained Scaling(FGS). On completion of the job kill RM. RM launches on another node. Delete output directory of first job and execute same job again. THis tests 1. That the RM successfully recovers the list of NM Tasks that it has launched before restart. 2. The executorInfo is stored and retrieved from the state store. * Run a long running job. Kill the RM while the job is running. RM launches on another node and the job continues progress. 1. That the RM successfully recovers the list of NM Tasks that it has launched before restart. 2. The executorInfo is stored and retrieved from the state store. 3. RM recovers and job makes forward progress. Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/0fa49c26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/0fa49c26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/0fa49c26 Branch: refs/heads/phase1 Commit: 0fa49c26bc0492ef4b69a85c219eab53f1cc7f0a Parents: 23e01ec Author: Swapnil Daingade <sdaing...@maprtech.com> Authored: Sat Aug 15 05:23:32 2015 -0700 Committer: Swapnil Daingade <sdaing...@maprtech.com> Committed: Sat Aug 29 11:41:33 2015 -0700 ---------------------------------------------------------------------- .../executor/MyriadExecutorAuxService.java | 28 ++++++++- .../handlers/ResourceOffersEventHandler.java | 10 ++++ .../scheduler/fgs/NMHeartBeatHandler.java | 60 ++++---------------- .../scheduler/fgs/YarnNodeCapacityManager.java | 7 ++- .../java/com/ebay/myriad/state/NodeTask.java | 13 +++++ .../com/ebay/myriad/state/SchedulerState.java | 14 ++++- .../myriad/state/utils/ByteBufferSupport.java | 30 ++++++++++ 7 files changed, 109 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java index 2c7d87d..a6d126a 100644 --- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java +++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java @@ -20,12 +20,17 @@ package com.ebay.myriad.executor; import java.nio.ByteBuffer; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; import org.apache.mesos.MesosExecutorDriver; import org.apache.mesos.Protos.Status; +import org.apache.mesos.Protos.TaskState; +import org.apache.mesos.Protos.TaskStatus; +import org.apache.mesos.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +42,9 @@ public class MyriadExecutorAuxService extends AuxiliaryService { private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class); private static final String SERVICE_NAME = "myriad_service"; + public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_"; + + private MesosExecutorDriver driver; protected MyriadExecutorAuxService() { super(SERVICE_NAME); @@ -48,7 +56,7 @@ public class MyriadExecutorAuxService extends AuxiliaryService { new Thread(new Runnable() { public void run() { - MesosExecutorDriver driver = new MesosExecutorDriver(new MyriadExecutor()); + driver = new MesosExecutorDriver(new MyriadExecutor()); LOGGER.error("MyriadExecutor exit with status " + Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1)); } @@ -72,4 +80,22 @@ public class MyriadExecutorAuxService extends AuxiliaryService { return null; } + @Override + public void stopContainer(ContainerTerminationContext stopContainerContext) { + sendStatus(stopContainerContext.getContainerId(), TaskState.TASK_FINISHED); + } + + private void sendStatus(ContainerId containerId, TaskState taskState) { + Protos.TaskID taskId = Protos.TaskID.newBuilder() + .setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString()) + .build(); + + TaskStatus status = TaskStatus.newBuilder() + .setTaskId(taskId) + .setState(taskState) + .build(); + driver.sendStatusUpdate(status); + LOGGER.debug("Sent status " + taskState + " for taskId " + taskId); + } + } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index 51730ac..915bd2f 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -94,6 +94,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv schedulerState.getActiveTasks())) { TaskInfo task = taskFactory.createTask(offer, pendingTaskId, taskToLaunch); + List<OfferID> offerIds = new ArrayList<>(); offerIds.add(offer.getId()); List<TaskInfo> tasks = new ArrayList<>(); @@ -104,6 +105,15 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv driver.launchTasks(offerIds, tasks); launchedTaskId = pendingTaskId; + // TODO (sdaingade) For every NM Task that we launch, we currently + // need to backup the ExecutorInfo for that NM Task in the State Store. + // Without this, we will not be able to launch tasks corresponding to yarn + // containers. This is specially important in case the RM restarts. + if (task.hasExecutor() && taskToLaunch.getExecutorInfo() == null) { + taskToLaunch.setExecutorInfo(task.getExecutor()); + schedulerState.updateStateStore(); + } + taskToLaunch.setHostname(offer.getHostname()); taskToLaunch.setSlaveId(offer.getSlaveId()); offerMatch = true; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java index 9fd97ba..47393a4 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java @@ -1,23 +1,20 @@ package com.ebay.myriad.scheduler.fgs; -import com.ebay.myriad.executor.ContainerTaskStatusRequest; import com.ebay.myriad.scheduler.MyriadDriver; import com.ebay.myriad.scheduler.SchedulerUtils; import com.ebay.myriad.scheduler.TaskFactory; import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor; import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry; import com.ebay.myriad.state.SchedulerState; -import com.google.gson.Gson; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import javax.inject.Inject; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; @@ -118,8 +115,10 @@ public class NMHeartBeatHandler extends BaseInterceptor { RMNode rmNode = context.getRMNodes().get(event.getNodeId()); String hostName = rmNode.getNodeID().getHost(); - nodeStore.getNode(hostName).snapshotRunningContainers(); - sendStatusUpdatesToMesosForCompletedContainers(statusEvent); + Node host = nodeStore.getNode(hostName); + if (host != null) { + host.snapshotRunningContainers(); + } // New capacity of the node = // resources under use on the node (due to previous offers) + @@ -155,54 +154,17 @@ public class NMHeartBeatHandler extends BaseInterceptor { Resource usedResources = Resource.newInstance(0, 0); for (ContainerStatus status : statusEvent.getContainers()) { if (status.getState() == ContainerState.NEW || status.getState() == ContainerState.RUNNING) { - Resources.addTo(usedResources, yarnScheduler.getRMContainer(status.getContainerId()).getAllocatedResource()); + RMContainer rmContainer = yarnScheduler.getRMContainer(status.getContainerId()); + // (sdaingade) This check is needed as RMContainer information may not be populated + // immediately after a RM restart. + if (rmContainer != null) { + Resources.addTo(usedResources, rmContainer.getAllocatedResource()); + } } } return usedResources; } - private void sendStatusUpdatesToMesosForCompletedContainers(RMNodeStatusEvent statusEvent) { - // Send task update to Mesos - Protos.SlaveID slaveId = nodeStore.getNode(statusEvent.getNodeId().getHost()).getSlaveId(); - for (ContainerStatus status : statusEvent.getContainers()) { - ContainerId containerId = status.getContainerId(); - if (status.getState() == ContainerState.COMPLETE) { - requestExecutorToSendTaskStatusUpdate(slaveId, containerId, Protos.TaskState.TASK_FINISHED); - } else { // state == NEW | RUNNING - requestExecutorToSendTaskStatusUpdate(slaveId, containerId, Protos.TaskState.TASK_RUNNING); - } - } - } - - - /** - * sends a request to executor on the given slave to send back a status update - * for the mesos task launched for this container. - * - * TODO(Santosh): - * Framework messages are unreliable. Try a NM auxiliary service that can help - * send out the status messages from NM itself. NM and MyriadExecutor would need - * to be merged into a single process. - * - * @param slaveId - * @param containerId - * @param taskState - */ - private void requestExecutorToSendTaskStatusUpdate(Protos.SlaveID slaveId, - ContainerId containerId, - Protos.TaskState taskState) { - final String mesosTaskId = ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Sending out framework message requesting the executor to send {} status for task: {}", - taskState.name(), mesosTaskId); - } - ContainerTaskStatusRequest containerTaskStatusRequest = new ContainerTaskStatusRequest(); - containerTaskStatusRequest.setMesosTaskId(mesosTaskId); - containerTaskStatusRequest.setState(taskState.name()); - myriadDriver.getDriver().sendFrameworkMessage(getExecutorId(slaveId), slaveId, - new Gson().toJson(containerTaskStatusRequest).getBytes(Charset.defaultCharset())); - } - private Protos.ExecutorID getExecutorId(Protos.SlaveID slaveId) { return Protos.ExecutorID.newBuilder().setValue( TaskFactory.NMTaskFactoryImpl.EXECUTOR_PREFIX + slaveId.getValue()).build(); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java index 497b43d..12bbe73 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java @@ -12,6 +12,7 @@ import com.google.common.collect.Sets; import java.util.HashSet; import java.util.List; import java.util.Set; + import javax.inject.Inject; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; @@ -196,7 +197,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { public void setNodeCapacity(RMNode rmNode, Resource newCapacity) { rmNode.getTotalCapability().setMemory(newCapacity.getMemory()); rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores()); - + LOGGER.info("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity); // updates the scheduler with the new capacity for the NM. // the event is handled by the scheduler asynchronously rmContext.getDispatcher().getEventHandler().handle( @@ -213,10 +214,12 @@ public class YarnNodeCapacityManager extends BaseInterceptor { Protos.TaskID taskId = Protos.TaskID.newBuilder() .setValue(ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + container.getId().toString()).build(); + // TODO (sdaingade) Remove ExecutorInfo from the Node object + // as this is now cached in the NodeTask object in scheduler state. Protos.ExecutorInfo executorInfo = node.getExecInfo(); if (executorInfo == null) { executorInfo = Protos.ExecutorInfo.newBuilder( - taskFactory.getExecutorInfoForSlave(offer.getSlaveId(), null)) + state.getNodeTask(offer.getSlaveId()).getExecutorInfo()) .setFrameworkId(offer.getFrameworkId()).build(); node.setExecInfo(executorInfo); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java index 5b8b87d..8191eed 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java @@ -32,6 +32,11 @@ public class NodeTask { @JsonProperty private Protos.TaskStatus taskStatus; + /** + * Mesos executor for this node. + */ + private Protos.ExecutorInfo executorInfo; + public NodeTask(NMProfile profile) { this.profile = profile; this.hostname = ""; @@ -68,4 +73,12 @@ public class NodeTask { public void setTaskStatus(Protos.TaskStatus taskStatus) { this.taskStatus = taskStatus; } + + public Protos.ExecutorInfo getExecutorInfo() { + return executorInfo; + } + + public void setExecutorInfo(Protos.ExecutorInfo executorInfo) { + this.executorInfo = executorInfo; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java index 75503b6..e27e976 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java @@ -31,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.mesos.Protos; +import org.apache.mesos.Protos.SlaveID; + import com.ebay.myriad.state.utils.StoreContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -188,6 +190,16 @@ public class SchedulerState { return activeNodeTasks; } + public NodeTask getNodeTask(SlaveID slaveId) { + for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { + if (entry.getValue().getSlaveId() != null && + entry.getValue().getSlaveId().equals(slaveId)) { + return entry.getValue(); + } + } + return null; + } + public Set<Protos.TaskID> getStagingTaskIds() { return this.stagingTasks; } @@ -226,7 +238,7 @@ public class SchedulerState { updateStateStore(); } - private void updateStateStore() { + public void updateStateStore() { if (!isMyriadStateStore()) { return; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java index e1081f0..3d8d57e 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java @@ -116,6 +116,12 @@ public class ByteBufferSupport { } else { size += INT_SIZE; } + + if (nt.getExecutorInfo() != null) { + size += nt.getExecutorInfo().getSerializedSize() + INT_SIZE; + } else { + size += INT_SIZE; + } // Allocate and populate the buffer. ByteBuffer bb = createBuffer(size); @@ -123,6 +129,7 @@ public class ByteBufferSupport { putBytes(bb, hostname); putBytes(bb, getSlaveBytes(nt)); putBytes(bb, getTaskBytes(nt)); + putBytes(bb, getExecutorInfoBytes(nt)); // Make sure the buffer is at the beginning bb.rewind(); return bb; @@ -170,6 +177,7 @@ public class ByteBufferSupport { nt.setHostname(toString(bb)); nt.setSlaveId(toSlaveId(bb)); nt.setTaskStatus(toTaskStatus(bb)); + nt.setExecutorInfo(toExecutorInfo(bb)); } return nt; } @@ -182,6 +190,14 @@ public class ByteBufferSupport { } } + public static byte[] getExecutorInfoBytes(NodeTask nt) { + if (nt.getExecutorInfo() != null) { + return nt.getExecutorInfo().toByteArray(); + } else { + return ZERO_BYTES; + } + } + public static byte[] getSlaveBytes(NodeTask nt) { if (nt.getSlaveId() != null) { return nt.getSlaveId().toByteArray(); @@ -272,6 +288,20 @@ public class ByteBufferSupport { } } + public static Protos.ExecutorInfo toExecutorInfo(ByteBuffer bb) { + int size = bb.getInt(); + if (size > 0) { + try { + return Protos.ExecutorInfo.parseFrom(getBytes(bb, size)); + } catch (Exception e) { + throw new RuntimeException("ByteBuffer not in expected format," + + " failed to parse ExecutorInfo bytes", e); + } + } else { + return null; + } + } + public static ByteBuffer fillBuffer(byte src[]) { ByteBuffer bb = createBuffer(src.length); bb.put(src);