This is an automated email from the ASF dual-hosted git repository. bennoe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 1915150c6a83cd95197e25a68a6adf9b3ef5fb11 Author: Benno Evers <[email protected]> AuthorDate: Fri Mar 22 17:51:34 2019 +0100 Added new example framework for operation feedback. This adds a new example framework showcasing a possible implementation of the newly added operation feedback API. Review: https://reviews.apache.org/r/70282 --- src/Makefile.am | 6 + src/examples/CMakeLists.txt | 2 + src/examples/operation_feedback_framework.cpp | 861 +++++++++++++++++++++++++ src/tests/CMakeLists.txt | 1 + src/tests/examples_tests.cpp | 3 + src/tests/operation_feedback_framework_test.sh | 44 ++ 6 files changed, 917 insertions(+) diff --git a/src/Makefile.am b/src/Makefile.am index bcafe48..7c2131a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -2274,6 +2274,11 @@ test_http_executor_SOURCES = examples/test_http_executor.cpp test_http_executor_CPPFLAGS = $(MESOS_CPPFLAGS) test_http_executor_LDADD = libmesos.la $(LDADD) +check_PROGRAMS += operation-feedback-framework +operation_feedback_framework_SOURCES = examples/operation_feedback_framework.cpp +operation_feedback_framework_CPPFLAGS = $(MESOS_CPPFLAGS) +operation_feedback_framework_LDADD = libmesos.la $(LDADD) + check_PROGRAMS += long-lived-framework long_lived_framework_SOURCES = examples/long_lived_framework.cpp long_lived_framework_CPPFLAGS = $(MESOS_CPPFLAGS) @@ -2796,6 +2801,7 @@ dist_check_SCRIPTS += \ tests/java_v0_framework_test.sh \ tests/java_v1_framework_test.sh \ tests/no_executor_framework_test.sh \ + tests/operation_feedback_framework_test.sh \ tests/persistent_volume_framework_test.sh \ tests/python_framework_test.sh \ tests/test_http_framework_test.sh \ diff --git a/src/examples/CMakeLists.txt b/src/examples/CMakeLists.txt index f477539..a811a07 100644 --- a/src/examples/CMakeLists.txt +++ b/src/examples/CMakeLists.txt @@ -45,6 +45,7 @@ if (NOT WIN32) add_executable(load-generator-framework load_generator_framework.cpp) add_executable(long-lived-executor long_lived_executor.cpp) add_executable(long-lived-framework long_lived_framework.cpp) + add_executable(operation-feedback-framework operation_feedback_framework.cpp) add_executable(no-executor-framework no_executor_framework.cpp) add_executable(persistent-volume-framework persistent_volume_framework.cpp) add_executable(test-executor test_executor.cpp) @@ -84,6 +85,7 @@ if (NOT WIN32) target_link_libraries(load-generator-framework PRIVATE mesos) target_link_libraries(long-lived-executor PRIVATE mesos) target_link_libraries(long-lived-framework PRIVATE mesos) + target_link_libraries(operation-feedback-framework PRIVATE mesos) target_link_libraries(no-executor-framework PRIVATE mesos) target_link_libraries(persistent-volume-framework PRIVATE mesos) target_link_libraries(test-executor PRIVATE mesos) diff --git a/src/examples/operation_feedback_framework.cpp b/src/examples/operation_feedback_framework.cpp new file mode 100644 index 0000000..2480c34 --- /dev/null +++ b/src/examples/operation_feedback_framework.cpp @@ -0,0 +1,861 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <string> +#include <vector> + +#include <mesos/type_utils.hpp> + +#include <mesos/v1/mesos.hpp> +#include <mesos/v1/resources.hpp> +#include <mesos/v1/scheduler.hpp> + +#include <mesos/authorizer/acls.hpp> + +#include <process/defer.hpp> +#include <process/delay.hpp> +#include <process/process.hpp> +#include <process/protobuf.hpp> + +#include <stout/check.hpp> +#include <stout/exit.hpp> +#include <stout/flags.hpp> +#include <stout/option.hpp> +#include <stout/os.hpp> +#include <stout/protobuf.hpp> +#include <stout/stringify.hpp> +#include <stout/try.hpp> +#include <stout/uuid.hpp> + +#include "common/protobuf_utils.hpp" + +#include "examples/flags.hpp" + +// TODO(bevers): Write a version of `isTerminalState()` that is available +// from public headers and can handle v1 protobufs. +#include "internal/devolve.hpp" + +#include "logging/logging.hpp" + +using process::Owned; + +using std::string; +using std::vector; + +using mesos::ACL; +using mesos::ACLs; + +using mesos::v1::AgentID; +using mesos::v1::Credential; +using mesos::v1::FrameworkID; +using mesos::v1::FrameworkInfo; +using mesos::v1::Offer; +using mesos::v1::OperationID; +using mesos::v1::OperationState; +using mesos::v1::OperationStatus; +using mesos::v1::Resource; +using mesos::v1::Resources; +using mesos::v1::TaskID; +using mesos::v1::TaskInfo; +using mesos::v1::TaskStatus; + +using mesos::v1::OPERATION_FINISHED; +using mesos::v1::OPERATION_GONE_BY_OPERATOR; +using mesos::v1::OPERATION_FAILED; +using mesos::v1::OPERATION_ERROR; +using mesos::v1::OPERATION_DROPPED; +using mesos::v1::OPERATION_RECOVERING; +using mesos::v1::OPERATION_PENDING; +using mesos::v1::OPERATION_UNREACHABLE; +using mesos::v1::OPERATION_UNKNOWN; +using mesos::v1::OPERATION_UNSUPPORTED; + +using mesos::v1::TASK_DROPPED; +using mesos::v1::TASK_LOST; +using mesos::v1::TASK_ERROR; +using mesos::v1::TASK_FAILED; +using mesos::v1::TASK_FINISHED; +using mesos::v1::TASK_GONE; +using mesos::v1::TASK_GONE_BY_OPERATOR; +using mesos::v1::TASK_KILLING; +using mesos::v1::TASK_KILLED; +using mesos::v1::TASK_RUNNING; +using mesos::v1::TASK_STAGING; +using mesos::v1::TASK_STARTING; +using mesos::v1::TASK_UNREACHABLE; +using mesos::v1::TASK_UNKNOWN; + +using mesos::v1::scheduler::Call; +using mesos::v1::scheduler::Event; +using mesos::v1::scheduler::Mesos; + + +namespace { + +constexpr char FRAMEWORK_NAME[] = "Operation Feedback Framework (C++)"; +constexpr Duration RECONCILIATION_INTERVAL = Seconds(30); +constexpr Duration RESUBSCRIPTION_INTERVAL = Seconds(2); + +} // namespace { + + +// This framework will run a set of given tasks on a Mesos cluster. +// For each task, the framework will attempt to `RESERVE` task resources, +// then `LAUNCH` the task and finally `UNRESERVE` task resources. +// +// This is very similar to what the `DynamicReservationFramework` does, but +// this one does so using the v1 scheduler API and offer operation feedback +// to showcase both of these features. +// +// In particular, if everything works smoothly every task passes sequentially +// through the following lifecycle stages: +// +// 1) AWAITING_RESERVE_OFFER +// The framework is waiting for a suitable offer containing +// resources to reserve. +// +// 2) AWAITING_RESERVE_ACK +// The framework attempted to reserve resources and is awaiting +// confirmation via offer operation feedback. +// +// 3) AWAITING_LAUNCH_OFFER +// The resources for this task have been successfully reserved, and it +// is awaiting a suitable offer to launch the task on the reservation. +// +// 4) AWAITING_TASK_FINISHED +// The task has been launched, and the framework is waiting for it to +// finish. +// +// 5) AWAITING_UNRESERVE_OFFER +// The task has finished successfully, and the framework is waiting for +// another offer containing this task's reservation so it can clean up. +// +// 6) AWAITING_UNRESERVE_ACK +// The framework attempted to unreserve the resources for this task. +// +// +// Note that some failure conditions are not handled by this example framework: +// - If an agent containing the reservation for one of the tasks is permanently +// removed before the task finished, this is not detected and no attempt is +// made to move the reservation to another agent. Instead, the task will be +// left waiting for an offer from that agent forever. +// +// - If the framework is killed or shut down before all reservations have been +// unreserved, these left-over reservation require manual cleanup. +// +// - The framework does not currently suppress or revive offers. + +class OperationFeedbackScheduler + : public process::Process<OperationFeedbackScheduler> +{ + struct SchedulerTask; + +public: + OperationFeedbackScheduler( + const FrameworkInfo& framework, + const string& master, + const string& role, + const Option<Credential>& credential) + : framework_(framework), + master_(master), + role_(role), + credential_(credential) + { + } + + ~OperationFeedbackScheduler() override {} + + void addTask(const string& command, const Resources& resources) + { + static int taskIdCounter = 0; + ++taskIdCounter; + + Resource::ReservationInfo reservationInfo; + reservationInfo.set_type(Resource::ReservationInfo::DYNAMIC); + reservationInfo.set_role(role_); + reservationInfo.set_principal(framework_.principal()); + + SchedulerTask task; + task.stage = SchedulerTask::AWAITING_RESERVE_OFFER; + task.taskResources = resources; + task.taskResources.allocate(role_); + // The task will run on reserved resources. + Resources taskResourcesReserved = + task.taskResources.pushReservation(reservationInfo); + task.taskInfo.mutable_resources()->CopyFrom(taskResourcesReserved); + task.taskInfo.mutable_command()->set_shell(true); + task.taskInfo.mutable_command()->set_value(command); + task.taskInfo.mutable_task_id()->set_value( + "task-" + stringify(taskIdCounter)); + task.taskInfo.set_name( + "Operation Feedback Task " + stringify(taskIdCounter)); + + tasks_.push_back(task); + return; + } + +protected: + void initialize() override + { + // We initialize the library here to ensure that callbacks are only invoked + // after the process has spawned. + mesos_.reset( + new Mesos( + master_, + mesos::ContentType::PROTOBUF, + process::defer(self(), &Self::connected), + process::defer(self(), &Self::disconnected), + process::defer(self(), &Self::received, lambda::_1), + credential_)); + } + + void finalize() override + { + if (framework_.has_id()) { + Call call; + call.mutable_framework_id()->CopyFrom(framework_.id()); + call.set_type(Call::TEARDOWN); + mesos_->send(call); + } + } + + void connected() + { + LOG(INFO) << "Connected"; + reconcileOperations(); + subscribe(); + } + + void subscribe() + { + LOG(INFO) << "Sending `SUBSCRIBE` call to Mesos master"; + // The master can respond with an error to the `SUBSCRIBE` call, but the + // `Mesos` class will just silently swallow that error, leaving us hanging + // forever with no clue what's going on. + // In particular, running this with the `--master=local` flag frequently + // results in `503 Service Unavailable` responses. + // Therefore, we have to retry in a loop until we're actually registered. + if (!framework_.has_id()) { + mesos_->send(SUBSCRIBE(framework_)); + process::delay(RESUBSCRIPTION_INTERVAL, self(), &Self::subscribe); + } + } + + void disconnected() + { + EXIT(EXIT_FAILURE) << "Disconnected"; + } + + void received(std::queue<Event> events) + { + while (!events.empty()) { + Event event = events.front(); + events.pop(); + + VLOG(1) << "Received " << event.type() << " event"; + + switch (event.type()) { + case Event::SUBSCRIBED: { + framework_.mutable_id()->CopyFrom(event.subscribed().framework_id()); + LOG(INFO) << "Subscribed with ID '" << framework_.id(); + break; + } + + case Event::OFFERS: { + resourceOffers(google::protobuf::convert(event.offers().offers())); + break; + } + + case Event::UPDATE: { + taskStatusUpdate(event.update().status()); + break; + } + + case Event::UPDATE_OPERATION_STATUS: { + operationStatusUpdate(event.update_operation_status().status()); + break; + } + + case Event::ERROR: { + EXIT(EXIT_FAILURE) << "Error: " << event.error().message(); + break; + } + + default: + break; + } + } + } + + void resourceOffers(const vector<Offer>& offers) + { + foreach (const Offer& offer, offers) { + VLOG(1) << "Offer " << offer.id() + << " from agent " << offer.agent_id() + << " contained resources " << offer.resources(); + + Call call; + call.set_type(Call::ACCEPT); + call.mutable_framework_id()->CopyFrom(framework_.id()); + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + Resources remaining(offer.resources()); + int reservations = 0, launches = 0, unreservations = 0; + bool havePendingTasks = false; // Whether any task awaits an offer. + + // For each pending task that does not yet have a reservation, + // check whether the offer contains enough unreserved resources + // and if so, attempt to reserve them. + foreachTaskInState(SchedulerTask::AWAITING_RESERVE_OFFER, + [&] (SchedulerTask& task) { + havePendingTasks = true; + if (remaining.contains(task.taskResources)) { + LOG(INFO) << "Reserving resources for task " + << task.taskInfo.task_id(); + + Offer::Operation* operation = accept->add_operations(); + operation->set_type(Offer::Operation::RESERVE); + std::string id = "reserve-" + id::UUID::random().toString(); + operation->mutable_id()->set_value(id); + operation->mutable_reserve()->mutable_resources()->CopyFrom( + task.taskInfo.resources()); + + remaining -= task.taskResources; + task.taskInfo.mutable_agent_id()->CopyFrom(offer.agent_id()); + task.reserveOperationId = operation->id(); + task.stage = SchedulerTask::AWAITING_RESERVE_ACK; + ++reservations; + } + }); + + // For each pending task that had its resources reserved successfully, + // attempt to launch the task. + foreachTaskInState(SchedulerTask::AWAITING_LAUNCH_OFFER, + [&] (SchedulerTask& task) { + CHECK(task.taskInfo.has_agent_id()); + havePendingTasks = true; + if (offer.agent_id() == task.taskInfo.agent_id() && + remaining.contains(task.taskInfo.resources())) { + LOG(INFO) << "Launching task " << task.taskInfo.task_id(); + + Offer::Operation* operation = accept->add_operations(); + operation->set_type(Offer::Operation::LAUNCH); + operation->mutable_launch()->add_task_infos()->CopyFrom( + task.taskInfo); + + task.stage = SchedulerTask::AWAITING_TASK_FINISHED; + ++launches; + } + }); + + // For each task that finished running, attempt to unreserve its + // resources. + foreachTaskInState(SchedulerTask::AWAITING_UNRESERVE_OFFER, + [&] (SchedulerTask& task) { + CHECK(task.taskInfo.has_agent_id()); + havePendingTasks = true; + if (offer.agent_id() == task.taskInfo.agent_id()) { + LOG(INFO) << "Unreserving resources for task " + << task.taskInfo.task_id(); + + Offer::Operation* operation = accept->add_operations(); + operation->set_type(Offer::Operation::UNRESERVE); + std::string id = "unreserve-" + id::UUID::random().toString(); + operation->mutable_id()->set_value(id); + operation->mutable_unreserve()->mutable_resources()->CopyFrom( + task.taskInfo.resources()); + + task.unreserveOperationId = operation->id(); + task.stage = SchedulerTask::AWAITING_UNRESERVE_ACK; + ++unreservations; + } + }); + + if (havePendingTasks) { + LOG(INFO) << "Accepting offer with " + << reservations << " `RESERVE` operations, " + << launches << " `LAUNCH` operations and " + << unreservations << " `UNRESERVE` operations while having " + << tasks_.size() << " non-completed tasks"; + } + + // Each `ACCEPT` call must only contain offers with the same agent + // id, so we have to send one call per offer back to the master. + // We also want to send the `ACCEPT` call if we don't launch any + // operations on this offer, in order to decline the offer. + mesos_->send(call); + } + } + + void taskStatusUpdate(const TaskStatus& status) + { + VLOG(1) << "Received status update " << status; + + const TaskID& taskId = status.task_id(); + + auto taskIterator = std::find_if(tasks_.begin(), tasks_.end(), + [&] (const SchedulerTask& tx) { + return tx.taskInfo.task_id() == taskId; + }); + + if (taskIterator == tasks_.end()) { + LOG(WARNING) << "Status update for unknown task " << taskId; + return; + } + + if (status.has_uuid()) { + mesos_->send(ACKNOWLEDGE(status, framework_.id())); + } + + if (taskIterator->stage != SchedulerTask::AWAITING_TASK_FINISHED) { + // We could get spurious updates due to the reconciliation process + // or because of retries; ignore them. + return; + } + + switch (status.state()) { + case TASK_STAGING: + case TASK_KILLING: + case TASK_STARTING: + case TASK_RUNNING: + case TASK_UNREACHABLE: { + // Nothing to do yet, wait for further updates. + VLOG(1) << "Received " << status.state() << " for task " << taskId; + break; + } + + case TASK_DROPPED: + case TASK_ERROR: + case TASK_KILLED: + case TASK_GONE: + case TASK_GONE_BY_OPERATOR: + case TASK_UNKNOWN: + case TASK_LOST: + case TASK_FAILED: { + LOG(INFO) << "Task " << taskId << " failed, attempting to relaunch" + << " with the next offer"; + + taskIterator->stage = SchedulerTask::AWAITING_LAUNCH_OFFER; + break; + } + + case TASK_FINISHED: { + LOG(INFO) << "Task " << taskId << " finished, attempting to unreserve" + << " its resources with the next offer"; + + taskIterator->stage = SchedulerTask::AWAITING_UNRESERVE_OFFER; + break; + } + } + } + + void operationStatusUpdate(const OperationStatus& status) + { + VLOG(1) << "Received operation status update " << status; + + if (status.has_uuid()) { + mesos_->send(ACKNOWLEDGE_OPERATION(status, framework_.id())); + } + + const OperationID& operationId = status.operation_id(); + + auto taskIterator = std::find_if(tasks_.begin(), tasks_.end(), + [&] (const SchedulerTask& tx) { + return tx.reserveOperationId == operationId; + }); + + if (taskIterator != tasks_.end()) { + handleReserveOperationStatusUpdate(taskIterator, status); + return; + } + + taskIterator = std::find_if(tasks_.begin(), tasks_.end(), + [&] (const SchedulerTask& tx) { + return tx.unreserveOperationId == operationId; + }); + + if (taskIterator != tasks_.end()) { + handleUnreserveOperationStatusUpdate(taskIterator, status); + return; + } + + LOG(WARNING) << "Status update for unknown operation " << operationId; + } + + void handleReserveOperationStatusUpdate( + typename std::list<SchedulerTask>::iterator task, + const OperationStatus& status) + { + if (task->stage != SchedulerTask::AWAITING_RESERVE_ACK) { + // We could get spurious updates due to the reconciliation process + // or because of retries; ignore them. + return; + } + + switch (status.state()) { + case OPERATION_PENDING: + case OPERATION_RECOVERING: + case OPERATION_UNKNOWN: + case OPERATION_UNREACHABLE: { + // Nothing to do but wait. + break; + } + + case OPERATION_UNSUPPORTED: { + // Someone in the future invented an additional operation state + // and accidentally sent it to us. + break; + } + + case OPERATION_FAILED: + case OPERATION_ERROR: + case OPERATION_DROPPED: + case OPERATION_GONE_BY_OPERATOR: { + LOG(INFO) + << "Received update " << status << " attempting to reserve " + << " resources for task " << task->taskInfo.task_id() + << "; retrying"; + + task->stage = SchedulerTask::AWAITING_RESERVE_OFFER; + break; + } + + case OPERATION_FINISHED: { + LOG(INFO) << "Successfully reserved resources for task " + << task->taskInfo.task_id() << "; awaiting launch offer"; + + task->stage = SchedulerTask::AWAITING_LAUNCH_OFFER; + break; + } + } + } + + void handleUnreserveOperationStatusUpdate( + typename std::list<SchedulerTask>::iterator task, + const OperationStatus& status) + { + if (task->stage != SchedulerTask::AWAITING_UNRESERVE_ACK) { + // We could get spurious updates due to the reconciliation + // or because of retries; ignore them. + return; + } + + switch (status.state()) { + case OPERATION_PENDING: + case OPERATION_RECOVERING: + case OPERATION_UNKNOWN: + case OPERATION_UNREACHABLE: { + // Nothing to do but wait. + break; + } + + case OPERATION_UNSUPPORTED: { + // Someone in the future invented an additional operation state + // and accidentally sent it to us. + break; + } + + case OPERATION_FAILED: + case OPERATION_ERROR: + case OPERATION_DROPPED: { + LOG(INFO) + << "Received update " << status << " attempting to unreserve " + << " resources for task " << task->taskInfo.task_id() + << "; retrying"; + + task->stage = SchedulerTask::AWAITING_UNRESERVE_OFFER; + break; + } + + case OPERATION_FINISHED: + case OPERATION_GONE_BY_OPERATOR: { + // We also count `GONE_BY_OPERATOR` as a success, because in that + // case there's nothing left to unreserve. + LOG(INFO) << "Task " << task->taskInfo.task_id() << " done; removing"; + tasks_.erase(task); + if (tasks_.empty()) { + LOG(INFO) << "All tasks completed, shutting down the framework..."; + process::terminate(self()); + } + break; + } + } + } + + void reconcileOperations() { + // Keep reconciling as long as the framework is running. + process::delay(RECONCILIATION_INTERVAL, self(), &Self::reconcileOperations); + + if (!framework_.has_id()) { + return; + } + + Call call; + call.set_type(Call::RECONCILE_OPERATIONS); + call.mutable_framework_id()->CopyFrom(framework_.id()); + Call::ReconcileOperations* reconcile = call.mutable_reconcile_operations(); + + for (const SchedulerTask& task : tasks_) { + switch (task.stage) { + case SchedulerTask::AWAITING_RESERVE_OFFER: + case SchedulerTask::AWAITING_UNRESERVE_OFFER: + case SchedulerTask::AWAITING_LAUNCH_OFFER: + case SchedulerTask::AWAITING_TASK_FINISHED: { + // Not currently waiting for any offer feedback. + break; + } + + case SchedulerTask::AWAITING_RESERVE_ACK: { + Call::ReconcileOperations::Operation* operation = + reconcile->add_operations(); + operation->mutable_agent_id()->MergeFrom(task.taskInfo.agent_id()); + operation->mutable_operation_id()->MergeFrom( + task.reserveOperationId.get()); + } + + case SchedulerTask::AWAITING_UNRESERVE_ACK: { + Call::ReconcileOperations::Operation* operation = + reconcile->add_operations(); + operation->mutable_agent_id()->MergeFrom(task.taskInfo.agent_id()); + operation->mutable_operation_id()->MergeFrom( + task.unreserveOperationId.get()); + } + } + } + + mesos_->send(call); + } + +private: + // Static helper functions to deal with protobuf generation: + + static Call SUBSCRIBE(const FrameworkInfo& framework) + { + Call call; + call.set_type(Call::SUBSCRIBE); + + if (framework.has_id()) { + call.mutable_framework_id()->CopyFrom(framework.id()); + } + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(framework); + + return call; + } + + static Call ACKNOWLEDGE( + const TaskStatus& status, + const FrameworkID& frameworkId) + { + CHECK(status.has_uuid()); + Call call; + call.set_type(Call::ACKNOWLEDGE); + call.mutable_framework_id()->CopyFrom(frameworkId); + + Call::Acknowledge* acknowledge = call.mutable_acknowledge(); + acknowledge->mutable_agent_id()->CopyFrom(status.agent_id()); + acknowledge->mutable_task_id()->CopyFrom(status.task_id()); + acknowledge->set_uuid(std::string(status.uuid())); + + return call; + } + + static Call ACKNOWLEDGE_OPERATION( + const OperationStatus& status, + const FrameworkID& frameworkId) + { + CHECK(status.has_uuid()); + Call call; + call.set_type(Call::ACKNOWLEDGE_OPERATION_STATUS); + call.mutable_framework_id()->CopyFrom(frameworkId); + + Call::AcknowledgeOperationStatus* acknowledge = + call.mutable_acknowledge_operation_status(); + acknowledge->mutable_agent_id()->CopyFrom(status.agent_id()); + acknowledge->mutable_operation_id()->CopyFrom(status.operation_id()); + acknowledge->set_uuid(status.uuid().value()); + + return call; + } + + Owned<Mesos> mesos_; + FrameworkInfo framework_; + string master_; + string role_; + Option<Credential> credential_; + + // Represents a task lifecycle from the scheduler's perspective, i.e. + // a reserve operation followed by a mesos task followed by an unreserve + // operation. + struct SchedulerTask { + enum Stage { + AWAITING_RESERVE_OFFER, + AWAITING_RESERVE_ACK, + AWAITING_LAUNCH_OFFER, + AWAITING_TASK_FINISHED, + AWAITING_UNRESERVE_OFFER, + AWAITING_UNRESERVE_ACK, + }; + + Stage stage; + + TaskInfo taskInfo; + // Since the resources inside `taskInfo` already contain a + // `ReservationInfo`, we store an extra copy without that as a + // convenience for offer matching. + Resources taskResources; + Option<OperationID> reserveOperationId; + Option<OperationID> unreserveOperationId; + }; + + std::list<SchedulerTask> tasks_; + + // This function needs to have `SchedulerTask::Stage` be already defined. + template<typename UnaryOperation> + void foreachTaskInState( + SchedulerTask::Stage stage, + UnaryOperation f) + { + for (SchedulerTask& task : tasks_) { + if (task.stage == stage) { + f(task); + } + } + } +}; + + +class Flags : public virtual mesos::internal::examples::Flags +{ +public: + Flags() + { + add(&Flags::user, + "user", + "The username under which to run tasks."); + + add(&Flags::command, + "command", + "The command to run for each task.", + "sleep 60"); + + add(&Flags::resources, + "resources", + "The resources to reserve for each task.", + "cpus:1;mem:32"); + + add(&Flags::num_tasks, + "num_tasks", + "The number of task.", + 60); + } + + string command; + string resources; + Option<string> user; + int num_tasks; +}; + + +int main(int argc, char** argv) +{ + Flags flags; + Try<flags::Warnings> load = flags.load("MESOS_EXAMPLE_", argc, argv); + + if (flags.help) { + std::cout << flags.usage() << std::endl; + return EXIT_SUCCESS; + } + + if (load.isError()) { + std::cerr << flags.usage(load.error()) << std::endl; + return EXIT_FAILURE; + } + + mesos::internal::logging::initialize(argv[0], false); + foreach (const flags::Warning& warning, load->warnings) { + LOG(WARNING) << warning.message; + } + + if (flags.role == "*") { + EXIT(EXIT_FAILURE) << flags.usage( + "Role is incorrect; the default '*' role cannot be used"); + } + + Option<Credential> credential = None(); + + if (flags.authenticate) { + LOG(INFO) << "Enabling authentication for the framework"; + + Credential credential_; + credential_.set_principal(flags.principal); + if (flags.secret.isSome()) { + credential_.set_secret(flags.secret.get()); + } + credential = credential_; + } + + Try<Resources> parsedResources = Resources::parse(flags.resources); + if (parsedResources.isError()) { + EXIT(EXIT_FAILURE) + << "Failed to parse resources: " << parsedResources.error(); + } + + FrameworkInfo framework; + framework.set_user(flags.user.isSome() ? flags.user.get() : os::user().get()); + framework.set_principal(flags.principal); + framework.set_name(FRAMEWORK_NAME); + framework.set_checkpoint(flags.checkpoint); + framework.add_roles(flags.role); + framework.add_capabilities()->set_type( + FrameworkInfo::Capability::MULTI_ROLE); + framework.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + framework.add_capabilities()->set_type( + FrameworkInfo::Capability::RESERVATION_REFINEMENT); + + LOG(INFO) << "Starting OperationFeedbackScheduler for " << flags.num_tasks + << " tasks with command " << flags.command; + + OperationFeedbackScheduler scheduler( + framework, + flags.master, + flags.role, + credential); + + for (int i=0; i < flags.num_tasks; ++i) { + scheduler.addTask(flags.command, parsedResources.get()); + } + + if (flags.master == "local") { + // Configure master. The constructor of `Mesos()` will load all + // environment variables prefixed by `MESOS_`. + os::setenv("MESOS_AUTHENTICATE_FRAMEWORKS", stringify(flags.authenticate)); + + ACLs acls; + ACL::RegisterFramework* acl = acls.add_register_frameworks(); + acl->mutable_principals()->set_type(ACL::Entity::ANY); + acl->mutable_roles()->add_values(flags.role); + os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls))); + } + + process::spawn(&scheduler); + process::wait(&scheduler); + + return EXIT_SUCCESS; +} diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index ab7f6c2..f3acd82 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -357,6 +357,7 @@ if (NOT WIN32) load-generator-framework long-lived-executor long-lived-framework + operation-feedback-framework no-executor-framework persistent-volume-framework test-executor diff --git a/src/tests/examples_tests.cpp b/src/tests/examples_tests.cpp index cadf1d0..9c7b28b 100644 --- a/src/tests/examples_tests.cpp +++ b/src/tests/examples_tests.cpp @@ -34,6 +34,9 @@ TEST_SCRIPT(ExamplesTest, DynamicReservationFramework, TEST_SCRIPT(ExamplesTest, DiskFullFramework, "disk_full_framework_test.sh") +TEST_SCRIPT(ExamplesTest, OperationFeedbackFramework, + "operation_feedback_framework_test.sh") + #ifdef MESOS_HAS_JAVA TEST_SCRIPT(ExamplesTest, JavaFramework, "java_framework_test.sh") TEST_SCRIPT(ExamplesTest, JavaException, "java_exception_test.sh") diff --git a/src/tests/operation_feedback_framework_test.sh b/src/tests/operation_feedback_framework_test.sh new file mode 100755 index 0000000..168b0db --- /dev/null +++ b/src/tests/operation_feedback_framework_test.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash + +# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment. + +env | grep MESOS_SOURCE_DIR >/dev/null + +test $? != 0 && \ + echo "Failed to find MESOS_SOURCE_DIR in environment" && \ + exit 1 + +env | grep MESOS_BUILD_DIR >/dev/null + +test $? != 0 && \ + echo "Failed to find MESOS_BUILD_DIR in environment" && \ + exit 1 + +source ${MESOS_SOURCE_DIR}/support/colors.sh +source ${MESOS_SOURCE_DIR}/support/atexit.sh + +MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX` + +atexit "rm -rf ${MESOS_WORK_DIR}" +export MESOS_WORK_DIR=${MESOS_WORK_DIR} + +MESOS_RUNTIME_DIR=`mktemp -d -t mesos-XXXXXX` + +atexit "rm -rf ${MESOS_RUNTIME_DIR}" +export MESOS_RUNTIME_DIR=${MESOS_RUNTIME_DIR} + +# Lower the authentication timeout to speed up the test (the master +# may drop the authentication message while it is recovering). +export MESOS_AUTHENTICATION_TIMEOUT=200ms + +# Set local Mesos runner to use 1 agent +export MESOS_NUM_SLAVES=1 + +# Set isolation for the slave. +export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem" + +# Set launcher for the slave. +export MESOS_LAUNCHER="posix" + +# Check that the C++ test framework executes without crashing (returns 0). +exec ${MESOS_HELPER_DIR}/operation-feedback-framework --master=local --role=test --num_tasks=1 --command="echo hello"
