Sent StatusUpdates if checkpointed resources don't exist on the slave. No bug was observed (yet), but realized I forgot about this in the dynamic reservations patches.
Review: https://reviews.apache.org/r/35433 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/efeb1183 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/efeb1183 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/efeb1183 Branch: refs/heads/master Commit: efeb1183760e4bd9dd73a2a65af16274673a721f Parents: ccf6c25 Author: Michael Park <[email protected]> Authored: Fri Jun 19 12:03:47 2015 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Fri Jun 19 12:03:48 2015 -0700 ---------------------------------------------------------------------- include/mesos/mesos.proto | 1 + src/slave/slave.cpp | 110 ++++++++++++++++++++++++++++++++++------- 2 files changed, 93 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/efeb1183/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index 8df1211..81008ed 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -893,6 +893,7 @@ message TaskStatus { REASON_MASTER_DISCONNECTED = 7; REASON_MEMORY_LIMIT = 8; REASON_RECONCILIATION = 9; + REASON_RESOURCES_UNKNOWN = 18; REASON_SLAVE_DISCONNECTED = 10; REASON_SLAVE_REMOVED = 11; REASON_SLAVE_RESTARTED = 12; http://git-wip-us.apache.org/repos/asf/mesos/blob/efeb1183/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 6c539b5..19b7508 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -1332,6 +1332,16 @@ void Slave::_runTask( framework->pending[executorId].erase(task.task_id()); if (framework->pending[executorId].empty()) { framework->pending.erase(executorId); + // NOTE: Ideally we would perform the following check here: + // + // if (framework->executors.empty() && + // framework->pending.empty()) { + // removeFramework(framework); + // } + // + // However, we need 'framework' to stay valid for the rest of + // this function. As such, we perform the check before each of + // the 'return' statements below. } } else { LOG(WARNING) << "Ignoring run task " << task.task_id() @@ -1347,9 +1357,12 @@ void Slave::_runTask( << " of framework " << frameworkId << " because the framework is terminating"; + // Refer to the comment after 'framework->pending.erase' above + // for why we need this. if (framework->executors.empty() && framework->pending.empty()) { removeFramework(framework); } + return; } @@ -1373,6 +1386,8 @@ void Slave::_runTask( // manager to stop retrying for its un-acked updates. statusUpdate(update, UPID()); + // Refer to the comment after 'framework->pending.erase' above + // for why we need this. if (framework->executors.empty() && framework->pending.empty()) { removeFramework(framework); } @@ -1380,28 +1395,75 @@ void Slave::_runTask( return; } - // NOTE: If the task or executor uses persistent volumes, the slave - // should already know about it. In case the slave doesn't know - // about them (e.g., CheckpointResourcesMessage was dropped or came - // out of order), we simply fail the slave to be safe. - Resources volumes = Resources(task.resources()).persistentVolumes(); + // NOTE: If the task or executor uses resources that are + // checkpointed on the slave (e.g. persistent volumes), we should + // already know about it. If the slave doesn't know about them (e.g. + // CheckpointResourcesMessage was dropped or came out of order), + // we send TASK_LOST status updates here since restarting the task + // may succeed in the event that CheckpointResourcesMessage arrives + // out of order. + Resources checkpointedTaskResources = + Resources(task.resources()).filter(needCheckpointing); - foreach (const Resource& volume, volumes) { - CHECK(checkpointedResources.contains(volume)) - << "Unknown persistent volume " << volume - << " for task " << task.task_id() - << " of framework " << frameworkId; + foreach (const Resource& resource, checkpointedTaskResources) { + if (!checkpointedResources.contains(resource)) { + LOG(WARNING) << "Unknown checkpointed resource " << resource + << " for task " << task.task_id() + << " of framework " << frameworkId; + + const StatusUpdate update = protobuf::createStatusUpdate( + frameworkId, + info.id(), + task.task_id(), + TASK_LOST, + TaskStatus::SOURCE_SLAVE, + "The checkpointed resources being used by the task are unknown to " + "the slave", + TaskStatus::REASON_RESOURCES_UNKNOWN); + + statusUpdate(update, UPID()); + + // Refer to the comment after 'framework->pending.erase' above + // for why we need this. + if (framework->executors.empty() && framework->pending.empty()) { + removeFramework(framework); + } + + return; + } } if (task.has_executor()) { - Resources volumes = - Resources(task.executor().resources()).persistentVolumes(); - - foreach (const Resource& volume, volumes) { - CHECK(checkpointedResources.contains(volume)) - << "Unknown persistent volume " << volume - << " for executor " << task.executor().executor_id() - << " of framework " << frameworkId; + Resources checkpointedExecutorResources = + Resources(task.executor().resources()).filter(needCheckpointing); + + foreach (const Resource& resource, checkpointedExecutorResources) { + if (!checkpointedResources.contains(resource)) { + LOG(WARNING) << "Unknown checkpointed resource " << resource + << " for executor " << task.executor().executor_id() + << " of framework " << frameworkId; + + const StatusUpdate update = protobuf::createStatusUpdate( + frameworkId, + info.id(), + task.task_id(), + TASK_LOST, + TaskStatus::SOURCE_SLAVE, + "The checkpointed resources being used by the executor are unknown " + "to the slave", + TaskStatus::REASON_RESOURCES_UNKNOWN, + task.executor().executor_id()); + + statusUpdate(update, UPID()); + + // Refer to the comment after 'framework->pending.erase' above + // for why we need this. + if (framework->executors.empty() && framework->pending.empty()) { + removeFramework(framework); + } + + return; + } } } @@ -1415,6 +1477,12 @@ void Slave::_runTask( << " of framework " << frameworkId << " because the slave is terminating"; + // Refer to the comment after 'framework->pending.erase' above + // for why we need this. + if (framework->executors.empty() && framework->pending.empty()) { + removeFramework(framework); + } + // We don't send a TASK_LOST here because the slave is // terminating. return; @@ -1507,6 +1575,12 @@ void Slave::_runTask( << " is in unexpected state " << executor->state; break; } + + // Refer to the comment after 'framework->pending.erase' above + // for why we need this. + if (framework->executors.empty() && framework->pending.empty()) { + removeFramework(framework); + } }
