http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/tests/scheduler_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp index 9d29d1a..3f01c06 100644 --- a/src/tests/scheduler_tests.cpp +++ b/src/tests/scheduler_tests.cpp @@ -18,32 +18,29 @@ #include <gmock/gmock.h> -#include <memory> #include <string> #include <queue> -#include <vector> #include <mesos/executor.hpp> -#include <mesos/scheduler.hpp> -#include <mesos/type_utils.hpp> -#include <mesos/scheduler/scheduler.hpp> +#include <mesos/v1/mesos.hpp> +#include <mesos/v1/resources.hpp> +#include <mesos/v1/scheduler.hpp> + +#include <mesos/v1/scheduler/scheduler.hpp> #include <process/clock.hpp> #include <process/future.hpp> #include <process/gmock.hpp> #include <process/gtest.hpp> -#include <process/http.hpp> -#include <process/owned.hpp> #include <process/pid.hpp> #include <process/queue.hpp> -#include <process/metrics/metrics.hpp> - -#include <stout/json.hpp> #include <stout/lambda.hpp> #include <stout/try.hpp> -#include <stout/uuid.hpp> + +#include "internal/devolve.hpp" +#include "internal/evolve.hpp" #include "master/allocator/mesos/allocator.hpp" @@ -59,23 +56,16 @@ using mesos::internal::master::Master; using mesos::internal::slave::Containerizer; using mesos::internal::slave::Slave; -using mesos::scheduler::Call; -using mesos::scheduler::Event; +using mesos::v1::scheduler::Call; +using mesos::v1::scheduler::Event; +using mesos::v1::scheduler::Mesos; using process::Clock; using process::Future; -using process::Owned; using process::PID; -using process::Promise; using process::Queue; -using process::http::OK; - -using process::metrics::internal::MetricsProcess; - using std::string; -using std::queue; -using std::vector; using testing::_; using testing::AtMost; @@ -126,9 +116,9 @@ TEST_F(SchedulerTest, Subscribe) EXPECT_CALL(callbacks, connected()) .WillOnce(FutureSatisfy(&connected)); - scheduler::Mesos mesos( + Mesos mesos( master.get(), - DEFAULT_CREDENTIAL, + DEFAULT_V1_CREDENTIAL, lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); @@ -145,7 +135,7 @@ TEST_F(SchedulerTest, Subscribe) call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); subscribe->set_force(true); mesos.send(call); @@ -155,7 +145,7 @@ TEST_F(SchedulerTest, Subscribe) AWAIT_READY(event); EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); - FrameworkID id(event.get().subscribed().framework_id()); + v1::FrameworkID id(event.get().subscribed().framework_id()); // Resubscribe with the same framework id. { @@ -164,7 +154,7 @@ TEST_F(SchedulerTest, Subscribe) call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); subscribe->mutable_framework_info()->mutable_id()->CopyFrom(id); subscribe->set_force(true); @@ -198,9 +188,9 @@ TEST_F(SchedulerTest, TaskRunning) EXPECT_CALL(callbacks, connected()) .WillOnce(FutureSatisfy(&connected)); - scheduler::Mesos mesos( + Mesos mesos( master.get(), - DEFAULT_CREDENTIAL, + DEFAULT_V1_CREDENTIAL, lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); @@ -217,7 +207,7 @@ TEST_F(SchedulerTest, TaskRunning) call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); mesos.send(call); } @@ -226,7 +216,7 @@ TEST_F(SchedulerTest, TaskRunning) AWAIT_READY(event); EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); - FrameworkID id(event.get().subscribed().framework_id()); + v1::FrameworkID id(event.get().subscribed().framework_id()); event = events.get(); AWAIT_READY(event); @@ -245,14 +235,14 @@ TEST_F(SchedulerTest, TaskRunning) Return(Nothing()))) .WillRepeatedly(Return(Future<Nothing>())); // Ignore subsequent calls. - TaskInfo taskInfo; + v1::TaskInfo taskInfo; taskInfo.set_name(""); taskInfo.mutable_task_id()->set_value("1"); - taskInfo.mutable_slave_id()->CopyFrom( - event.get().offers().offers(0).slave_id()); + taskInfo.mutable_agent_id()->CopyFrom( + event.get().offers().offers(0).agent_id()); taskInfo.mutable_resources()->CopyFrom( event.get().offers().offers(0).resources()); - taskInfo.mutable_executor()->CopyFrom(DEFAULT_EXECUTOR_INFO); + taskInfo.mutable_executor()->CopyFrom(DEFAULT_V1_EXECUTOR_INFO); // TODO(benh): Enable just running a task with a command in the tests: // taskInfo.mutable_command()->set_value("sleep 10"); @@ -265,8 +255,8 @@ TEST_F(SchedulerTest, TaskRunning) Call::Accept* accept = call.mutable_accept(); accept->add_offer_ids()->CopyFrom(event.get().offers().offers(0).id()); - Offer::Operation* operation = accept->add_operations(); - operation->set_type(Offer::Operation::LAUNCH); + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH); operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo); mesos.send(call); @@ -275,9 +265,9 @@ TEST_F(SchedulerTest, TaskRunning) event = events.get(); AWAIT_READY(event); EXPECT_EQ(Event::UPDATE, event.get().type()); - EXPECT_EQ(TASK_RUNNING, event.get().update().status().state()); + EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state()); EXPECT_TRUE(event.get().update().status().has_executor_id()); - EXPECT_EQ(exec.id, event.get().update().status().executor_id()); + EXPECT_EQ(exec.id, devolve(event.get().update().status().executor_id())); AWAIT_READY(update); @@ -306,9 +296,9 @@ TEST_F(SchedulerTest, ReconcileTask) EXPECT_CALL(callbacks, connected()) .WillOnce(FutureSatisfy(&connected)); - scheduler::Mesos mesos( + Mesos mesos( master.get(), - DEFAULT_CREDENTIAL, + DEFAULT_V1_CREDENTIAL, lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); @@ -325,7 +315,7 @@ TEST_F(SchedulerTest, ReconcileTask) call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); mesos.send(call); } @@ -334,7 +324,7 @@ TEST_F(SchedulerTest, ReconcileTask) AWAIT_READY(event); EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); - FrameworkID id(event.get().subscribed().framework_id()); + v1::FrameworkID id(event.get().subscribed().framework_id()); event = events.get(); AWAIT_READY(event); @@ -347,8 +337,10 @@ TEST_F(SchedulerTest, ReconcileTask) EXPECT_CALL(exec, launchTask(_, _)) .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); - Offer offer = event.get().offers().offers(0); - TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID); + v1::Offer offer = event.get().offers().offers(0); + + v1::TaskInfo taskInfo = + evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID)); { Call call; @@ -358,8 +350,8 @@ TEST_F(SchedulerTest, ReconcileTask) Call::Accept* accept = call.mutable_accept(); accept->add_offer_ids()->CopyFrom(offer.id()); - Offer::Operation* operation = accept->add_operations(); - operation->set_type(Offer::Operation::LAUNCH); + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH); operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo); mesos.send(call); @@ -368,7 +360,7 @@ TEST_F(SchedulerTest, ReconcileTask) event = events.get(); AWAIT_READY(event); EXPECT_EQ(Event::UPDATE, event.get().type()); - EXPECT_EQ(TASK_RUNNING, event.get().update().status().state()); + EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state()); { Call call; @@ -385,8 +377,8 @@ TEST_F(SchedulerTest, ReconcileTask) AWAIT_READY(event); EXPECT_EQ(Event::UPDATE, event.get().type()); EXPECT_FALSE(event.get().update().status().has_uuid()); - EXPECT_EQ(TASK_RUNNING, event.get().update().status().state()); - EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, + EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state()); + EXPECT_EQ(v1::TaskStatus::REASON_RECONCILIATION, event.get().update().status().reason()); EXPECT_CALL(exec, shutdown(_)) @@ -414,9 +406,9 @@ TEST_F(SchedulerTest, KillTask) EXPECT_CALL(callbacks, connected()) .WillOnce(FutureSatisfy(&connected)); - scheduler::Mesos mesos( + Mesos mesos( master.get(), - DEFAULT_CREDENTIAL, + DEFAULT_V1_CREDENTIAL, lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); @@ -433,7 +425,7 @@ TEST_F(SchedulerTest, KillTask) call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); mesos.send(call); } @@ -442,7 +434,7 @@ TEST_F(SchedulerTest, KillTask) AWAIT_READY(event); EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); - FrameworkID id(event.get().subscribed().framework_id()); + v1::FrameworkID id(event.get().subscribed().framework_id()); event = events.get(); AWAIT_READY(event); @@ -455,8 +447,10 @@ TEST_F(SchedulerTest, KillTask) EXPECT_CALL(exec, launchTask(_, _)) .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); - Offer offer = event.get().offers().offers(0); - TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID); + v1::Offer offer = event.get().offers().offers(0); + + v1::TaskInfo taskInfo = + evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID)); { Call call; @@ -466,8 +460,8 @@ TEST_F(SchedulerTest, KillTask) Call::Accept* accept = call.mutable_accept(); accept->add_offer_ids()->CopyFrom(offer.id()); - Offer::Operation* operation = accept->add_operations(); - operation->set_type(Offer::Operation::LAUNCH); + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH); operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo); mesos.send(call); @@ -476,7 +470,7 @@ TEST_F(SchedulerTest, KillTask) event = events.get(); AWAIT_READY(event); EXPECT_EQ(Event::UPDATE, event.get().type()); - EXPECT_EQ(TASK_RUNNING, event.get().update().status().state()); + EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state()); { // Acknowledge TASK_RUNNING update. @@ -486,7 +480,7 @@ TEST_F(SchedulerTest, KillTask) Call::Acknowledge* acknowledge = call.mutable_acknowledge(); acknowledge->mutable_task_id()->CopyFrom(taskInfo.task_id()); - acknowledge->mutable_slave_id()->CopyFrom(offer.slave_id()); + acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id()); acknowledge->set_uuid(event.get().update().status().uuid()); mesos.send(call); @@ -502,7 +496,7 @@ TEST_F(SchedulerTest, KillTask) Call::Kill* kill = call.mutable_kill(); kill->mutable_task_id()->CopyFrom(taskInfo.task_id()); - kill->mutable_slave_id()->CopyFrom(offer.slave_id()); + kill->mutable_agent_id()->CopyFrom(offer.agent_id()); mesos.send(call); } @@ -510,7 +504,7 @@ TEST_F(SchedulerTest, KillTask) event = events.get(); AWAIT_READY(event); EXPECT_EQ(Event::UPDATE, event.get().type()); - EXPECT_EQ(TASK_KILLED, event.get().update().status().state()); + EXPECT_EQ(v1::TASK_KILLED, event.get().update().status().state()); EXPECT_CALL(exec, shutdown(_)) .Times(AtMost(1)); @@ -537,9 +531,9 @@ TEST_F(SchedulerTest, ShutdownExecutor) EXPECT_CALL(callbacks, connected()) .WillOnce(FutureSatisfy(&connected)); - scheduler::Mesos mesos( + Mesos mesos( master.get(), - DEFAULT_CREDENTIAL, + DEFAULT_V1_CREDENTIAL, lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); @@ -556,7 +550,7 @@ TEST_F(SchedulerTest, ShutdownExecutor) call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); mesos.send(call); } @@ -565,7 +559,7 @@ TEST_F(SchedulerTest, ShutdownExecutor) AWAIT_READY(event); EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); - FrameworkID id(event.get().subscribed().framework_id()); + v1::FrameworkID id(event.get().subscribed().framework_id()); event = events.get(); AWAIT_READY(event); @@ -578,8 +572,10 @@ TEST_F(SchedulerTest, ShutdownExecutor) EXPECT_CALL(exec, launchTask(_, _)) .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED)); - Offer offer = event.get().offers().offers(0); - TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID); + v1::Offer offer = event.get().offers().offers(0); + + v1::TaskInfo taskInfo = + evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID)); { Call call; @@ -589,8 +585,8 @@ TEST_F(SchedulerTest, ShutdownExecutor) Call::Accept* accept = call.mutable_accept(); accept->add_offer_ids()->CopyFrom(offer.id()); - Offer::Operation* operation = accept->add_operations(); - operation->set_type(Offer::Operation::LAUNCH); + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH); operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo); mesos.send(call); @@ -599,7 +595,7 @@ TEST_F(SchedulerTest, ShutdownExecutor) event = events.get(); AWAIT_READY(event); EXPECT_EQ(Event::UPDATE, event.get().type()); - EXPECT_EQ(TASK_FINISHED, event.get().update().status().state()); + EXPECT_EQ(v1::TASK_FINISHED, event.get().update().status().state()); Future<Nothing> shutdown; EXPECT_CALL(exec, shutdown(_)) @@ -611,20 +607,20 @@ TEST_F(SchedulerTest, ShutdownExecutor) call.set_type(Call::SHUTDOWN); Call::Shutdown* shutdown = call.mutable_shutdown(); - shutdown->mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID); - shutdown->mutable_slave_id()->CopyFrom(offer.slave_id()); + shutdown->mutable_executor_id()->CopyFrom(DEFAULT_V1_EXECUTOR_ID); + shutdown->mutable_agent_id()->CopyFrom(offer.agent_id()); mesos.send(call); } AWAIT_READY(shutdown); - containerizer.destroy(id, DEFAULT_EXECUTOR_ID); + containerizer.destroy(devolve(id), DEFAULT_EXECUTOR_ID); // Executor termination results in a 'FAILURE' event. event = events.get(); AWAIT_READY(event); EXPECT_EQ(Event::FAILURE, event.get().type()); - ExecutorID executorId(DEFAULT_EXECUTOR_ID); + v1::ExecutorID executorId(DEFAULT_V1_EXECUTOR_ID); EXPECT_EQ(executorId, event.get().failure().executor_id()); Shutdown(); // Must shutdown before 'containerizer' gets deallocated. @@ -649,9 +645,9 @@ TEST_F(SchedulerTest, Teardown) EXPECT_CALL(callbacks, connected()) .WillOnce(FutureSatisfy(&connected)); - scheduler::Mesos mesos( + Mesos mesos( master.get(), - DEFAULT_CREDENTIAL, + DEFAULT_V1_CREDENTIAL, lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); @@ -668,7 +664,7 @@ TEST_F(SchedulerTest, Teardown) call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); mesos.send(call); } @@ -677,7 +673,7 @@ TEST_F(SchedulerTest, Teardown) AWAIT_READY(event); EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); - FrameworkID id(event.get().subscribed().framework_id()); + v1::FrameworkID id(event.get().subscribed().framework_id()); event = events.get(); AWAIT_READY(event); @@ -690,8 +686,10 @@ TEST_F(SchedulerTest, Teardown) EXPECT_CALL(exec, launchTask(_, _)) .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); - Offer offer = event.get().offers().offers(0); - TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID); + v1::Offer offer = event.get().offers().offers(0); + + v1::TaskInfo taskInfo = + evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID)); { Call call; @@ -701,8 +699,8 @@ TEST_F(SchedulerTest, Teardown) Call::Accept* accept = call.mutable_accept(); accept->add_offer_ids()->CopyFrom(offer.id()); - Offer::Operation* operation = accept->add_operations(); - operation->set_type(Offer::Operation::LAUNCH); + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH); operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo); mesos.send(call); @@ -711,7 +709,7 @@ TEST_F(SchedulerTest, Teardown) event = events.get(); AWAIT_READY(event); EXPECT_EQ(Event::UPDATE, event.get().type()); - EXPECT_EQ(TASK_RUNNING, event.get().update().status().state()); + EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state()); Future<Nothing> shutdown; EXPECT_CALL(exec, shutdown(_)) @@ -745,9 +743,9 @@ TEST_F(SchedulerTest, Decline) EXPECT_CALL(callbacks, connected()) .WillOnce(FutureSatisfy(&connected)); - scheduler::Mesos mesos( + Mesos mesos( master.get(), - DEFAULT_CREDENTIAL, + DEFAULT_V1_CREDENTIAL, lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); @@ -764,7 +762,7 @@ TEST_F(SchedulerTest, Decline) call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); mesos.send(call); } @@ -773,14 +771,14 @@ TEST_F(SchedulerTest, Decline) AWAIT_READY(event); EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); - FrameworkID id(event.get().subscribed().framework_id()); + v1::FrameworkID id(event.get().subscribed().framework_id()); event = events.get(); AWAIT_READY(event); EXPECT_EQ(Event::OFFERS, event.get().type()); ASSERT_EQ(1, event.get().offers().offers().size()); - Offer offer = event.get().offers().offers(0); + v1::Offer offer = event.get().offers().offers(0); { Call call; call.mutable_framework_id()->CopyFrom(id); @@ -790,7 +788,7 @@ TEST_F(SchedulerTest, Decline) decline->add_offer_ids()->CopyFrom(offer.id()); // Set 0s filter to immediately get another offer. - Filters filters; + v1::Filters filters; filters.set_refuse_seconds(0); decline->mutable_filters()->CopyFrom(filters); @@ -823,9 +821,9 @@ TEST_F(SchedulerTest, Revive) EXPECT_CALL(callbacks, connected()) .WillOnce(FutureSatisfy(&connected)); - scheduler::Mesos mesos( + Mesos mesos( master.get(), - DEFAULT_CREDENTIAL, + DEFAULT_V1_CREDENTIAL, lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); @@ -842,7 +840,7 @@ TEST_F(SchedulerTest, Revive) call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); mesos.send(call); } @@ -851,14 +849,14 @@ TEST_F(SchedulerTest, Revive) AWAIT_READY(event); EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); - FrameworkID id(event.get().subscribed().framework_id()); + v1::FrameworkID id(event.get().subscribed().framework_id()); event = events.get(); AWAIT_READY(event); EXPECT_EQ(Event::OFFERS, event.get().type()); EXPECT_NE(0, event.get().offers().offers().size()); - Offer offer = event.get().offers().offers(0); + v1::Offer offer = event.get().offers().offers(0); { Call call; call.mutable_framework_id()->CopyFrom(id); @@ -868,7 +866,7 @@ TEST_F(SchedulerTest, Revive) decline->add_offer_ids()->CopyFrom(offer.id()); // Set 1hr filter to not immediately get another offer. - Filters filters; + v1::Filters filters; filters.set_refuse_seconds(Hours(1).secs()); decline->mutable_filters()->CopyFrom(filters); @@ -921,9 +919,9 @@ TEST_F(SchedulerTest, Message) EXPECT_CALL(callbacks, connected()) .WillOnce(FutureSatisfy(&connected)); - scheduler::Mesos mesos( + Mesos mesos( master.get(), - DEFAULT_CREDENTIAL, + DEFAULT_V1_CREDENTIAL, lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); @@ -940,7 +938,7 @@ TEST_F(SchedulerTest, Message) call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); mesos.send(call); } @@ -949,7 +947,7 @@ TEST_F(SchedulerTest, Message) AWAIT_READY(event); EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); - FrameworkID id(event.get().subscribed().framework_id()); + v1::FrameworkID id(event.get().subscribed().framework_id()); event = events.get(); AWAIT_READY(event); @@ -962,8 +960,10 @@ TEST_F(SchedulerTest, Message) EXPECT_CALL(exec, launchTask(_, _)) .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); - Offer offer = event.get().offers().offers(0); - TaskInfo taskInfo = createTask(offer, "", DEFAULT_EXECUTOR_ID); + v1::Offer offer = event.get().offers().offers(0); + + v1::TaskInfo taskInfo = + evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID)); { Call call; @@ -973,8 +973,8 @@ TEST_F(SchedulerTest, Message) Call::Accept* accept = call.mutable_accept(); accept->add_offer_ids()->CopyFrom(offer.id()); - Offer::Operation* operation = accept->add_operations(); - operation->set_type(Offer::Operation::LAUNCH); + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH); operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo); mesos.send(call); @@ -983,7 +983,7 @@ TEST_F(SchedulerTest, Message) event = events.get(); AWAIT_READY(event); EXPECT_EQ(Event::UPDATE, event.get().type()); - EXPECT_EQ(TASK_RUNNING, event.get().update().status().state()); + EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state()); Future<string> data; EXPECT_CALL(exec, frameworkMessage(_, _)) @@ -995,8 +995,8 @@ TEST_F(SchedulerTest, Message) call.set_type(Call::MESSAGE); Call::Message* message = call.mutable_message(); - message->mutable_slave_id()->CopyFrom(offer.slave_id()); - message->mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID); + message->mutable_agent_id()->CopyFrom(offer.agent_id()); + message->mutable_executor_id()->CopyFrom(DEFAULT_V1_EXECUTOR_ID); message->set_data("hello world"); mesos.send(call); @@ -1019,9 +1019,9 @@ TEST_F(SchedulerTest, Request) EXPECT_CALL(callbacks, connected()) .WillOnce(FutureSatisfy(&connected)); - scheduler::Mesos mesos( + Mesos mesos( master.get(), - DEFAULT_CREDENTIAL, + DEFAULT_V1_CREDENTIAL, lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); @@ -1038,7 +1038,7 @@ TEST_F(SchedulerTest, Request) call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); mesos.send(call); } @@ -1047,7 +1047,7 @@ TEST_F(SchedulerTest, Request) AWAIT_READY(event); EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); - FrameworkID id(event.get().subscribed().framework_id()); + v1::FrameworkID id(event.get().subscribed().framework_id()); Future<Nothing> requestResources = FUTURE_DISPATCH(_, &MesosAllocatorProcess::requestResources); @@ -1073,326 +1073,6 @@ TEST_F(SchedulerTest, Request) // TODO(benh): Write test for sending Call::Acknowledgement through // master to slave when Event::Update was generated locally. - -class MesosSchedulerDriverTest : public MesosTest {}; - - -TEST_F(MesosSchedulerDriverTest, MetricsEndpoint) -{ - Try<PID<Master>> master = StartMaster(); - ASSERT_SOME(master); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - - Future<Nothing> registered; - EXPECT_CALL(sched, registered(&driver, _, _)) - .WillOnce(FutureSatisfy(®istered)); - - ASSERT_EQ(DRIVER_RUNNING, driver.start()); - - AWAIT_READY(registered); - - Future<process::http::Response> response = - process::http::get(MetricsProcess::instance()->self(), "/snapshot"); - - AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); - - EXPECT_SOME_EQ( - "application/json", - response.get().headers.get("Content-Type")); - - Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); - - ASSERT_SOME(parse); - - JSON::Object metrics = parse.get(); - - EXPECT_EQ(1u, metrics.values.count("scheduler/event_queue_messages")); - EXPECT_EQ(1u, metrics.values.count("scheduler/event_queue_dispatches")); - - driver.stop(); - driver.join(); - - Shutdown(); -} - - -// This action calls driver stop() followed by abort(). -ACTION(StopAndAbort) -{ - arg0->stop(); - arg0->abort(); -} - - -// This test verifies that when the scheduler calls stop() before -// abort(), no pending acknowledgements are sent. -TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort) -{ - Try<PID<Master>> master = StartMaster(); - ASSERT_SOME(master); - - MockExecutor exec(DEFAULT_EXECUTOR_ID); - TestContainerizer containerizer(&exec); - Try<PID<Slave>> slave = StartSlave(&containerizer); - ASSERT_SOME(slave); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)); - - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 16, "*")) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - // When an update is received, stop the driver and then abort it. - Future<Nothing> statusUpdate; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(DoAll(StopAndAbort(), - FutureSatisfy(&statusUpdate))); - - // Ensure no status update acknowledgements are sent from the driver - // to the master. - EXPECT_NO_FUTURE_CALLS( - mesos::scheduler::Call(), - mesos::scheduler::Call::ACKNOWLEDGE, - _ , - master.get()); - - EXPECT_CALL(exec, registered(_, _, _, _)); - - EXPECT_CALL(exec, launchTask(_, _)) - .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); - - EXPECT_CALL(exec, shutdown(_)) - .Times(AtMost(1)); - - driver.start(); - - AWAIT_READY(statusUpdate); - - // Settle the clock to ensure driver finishes processing the status - // update and sends acknowledgement if necessary. In this test it - // shouldn't send an acknowledgement. - Clock::pause(); - Clock::settle(); - - driver.stop(); - driver.join(); - - Shutdown(); -} - - -// Ensures that when a scheduler enables explicit acknowledgements -// on the driver, there are no implicit acknowledgements sent, and -// the call to 'acknowledgeStatusUpdate' sends the ack to the master. -TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements) -{ - Try<PID<Master>> master = StartMaster(); - ASSERT_SOME(master); - - MockExecutor exec(DEFAULT_EXECUTOR_ID); - TestContainerizer containerizer(&exec); - Try<PID<Slave>> slave = StartSlave(&containerizer); - ASSERT_SOME(slave); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)); - - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 16, "*")) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - Future<TaskStatus> status; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status)); - - // Ensure no status update acknowledgements are sent from the driver - // to the master until the explicit acknowledgement is sent. - EXPECT_NO_FUTURE_CALLS( - mesos::scheduler::Call(), - mesos::scheduler::Call::ACKNOWLEDGE, - _ , - master.get()); - - EXPECT_CALL(exec, registered(_, _, _, _)); - - EXPECT_CALL(exec, launchTask(_, _)) - .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); - - EXPECT_CALL(exec, shutdown(_)) - .Times(AtMost(1)); - - driver.start(); - - AWAIT_READY(status); - - // Settle the clock to ensure driver finishes processing the status - // update, we want to ensure that no implicit acknowledgement gets - // sent. - Clock::pause(); - Clock::settle(); - - // Now send the acknowledgement. - Future<mesos::scheduler::Call> acknowledgement = FUTURE_CALL( - mesos::scheduler::Call(), - mesos::scheduler::Call::ACKNOWLEDGE, - _, - master.get()); - - driver.acknowledgeStatusUpdate(status.get()); - - AWAIT_READY(acknowledgement); - - driver.stop(); - driver.join(); - - Shutdown(); -} - - -// This test ensures that when explicit acknowledgements are enabled, -// acknowledgements for master-generated updates are dropped by the -// driver. We test this by creating an invalid task that uses no -// resources. -TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsMasterGeneratedUpdate) -{ - Try<PID<Master>> master = StartMaster(); - ASSERT_SOME(master); - - Try<PID<Slave>> slave = StartSlave(); - ASSERT_SOME(slave); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)); - - Future<vector<Offer>> offers; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - // Ensure no status update acknowledgements are sent to the master. - EXPECT_NO_FUTURE_CALLS( - mesos::scheduler::Call(), - mesos::scheduler::Call::ACKNOWLEDGE, - _ , - master.get()); - - driver.start(); - - AWAIT_READY(offers); - EXPECT_NE(0u, offers.get().size()); - - // Launch a task using no resources. - TaskInfo task; - task.set_name(""); - task.mutable_task_id()->set_value("1"); - task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); - task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); - - vector<TaskInfo> tasks; - tasks.push_back(task); - - Future<TaskStatus> status; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status)); - - driver.launchTasks(offers.get()[0].id(), tasks); - - AWAIT_READY(status); - ASSERT_EQ(TASK_ERROR, status.get().state()); - ASSERT_EQ(TaskStatus::SOURCE_MASTER, status.get().source()); - ASSERT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); - - // Now send the acknowledgement. - driver.acknowledgeStatusUpdate(status.get()); - - // Settle the clock to ensure driver processes the acknowledgement, - // which should get dropped due to having come from the master. - Clock::pause(); - Clock::settle(); - - driver.stop(); - driver.join(); - - Shutdown(); -} - - -// This test ensures that the driver handles an empty slave id -// in an acknowledgement message by dropping it. The driver will -// log an error in this case (but we don't test for that). We -// generate a status with no slave id by performing reconciliation. -TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsUnsetSlaveID) -{ - Try<PID<Master>> master = StartMaster(); - ASSERT_SOME(master); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), false, DEFAULT_CREDENTIAL); - - Future<Nothing> registered; - EXPECT_CALL(sched, registered(&driver, _, _)) - .WillOnce(FutureSatisfy(®istered)); - - // Ensure no status update acknowledgements are sent to the master. - EXPECT_NO_FUTURE_CALLS( - mesos::scheduler::Call(), - mesos::scheduler::Call::ACKNOWLEDGE, - _ , - master.get()); - - driver.start(); - - AWAIT_READY(registered); - - Future<TaskStatus> update; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&update)); - - // Peform reconciliation without using a slave id. - vector<TaskStatus> statuses; - - TaskStatus status; - status.mutable_task_id()->set_value("foo"); - status.set_state(TASK_RUNNING); - - statuses.push_back(status); - - driver.reconcileTasks(statuses); - - AWAIT_READY(update); - ASSERT_EQ(TASK_LOST, update.get().state()); - ASSERT_EQ(TaskStatus::SOURCE_MASTER, update.get().source()); - ASSERT_EQ(TaskStatus::REASON_RECONCILIATION, update.get().reason()); - ASSERT_FALSE(update.get().has_slave_id()); - - // Now send the acknowledgement. - driver.acknowledgeStatusUpdate(update.get()); - - // Settle the clock to ensure driver processes the acknowledgement, - // which should get dropped due to the missing slave id. - Clock::pause(); - Clock::settle(); - - driver.stop(); - driver.join(); - - Shutdown(); -} - } // namespace tests { } // namespace internal { } // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index cb5a01e..d55e9dd 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -29,8 +29,6 @@ #include <mesos/executor.hpp> #include <mesos/scheduler.hpp> -#include <mesos/scheduler/scheduler.hpp> - #include <process/clock.hpp> #include <process/future.hpp> #include <process/gmock.hpp> http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/v1/attributes.cpp ---------------------------------------------------------------------- diff --git a/src/v1/attributes.cpp b/src/v1/attributes.cpp new file mode 100644 index 0000000..37085be --- /dev/null +++ b/src/v1/attributes.cpp @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <iostream> +#include <vector> + +#include <glog/logging.h> + +#include <mesos/v1/attributes.hpp> +#include <mesos/v1/values.hpp> + +#include <stout/foreach.hpp> +#include <stout/strings.hpp> + +using std::ostream; +using std::string; +using std::vector; + +namespace mesos { +namespace v1 { + +std::ostream& operator << (std::ostream& stream, const Attribute& attribute) +{ + stream << attribute.name() << "="; + switch (attribute.type()) { + case Value::SCALAR: stream << attribute.scalar(); break; + case Value::RANGES: stream << attribute.ranges(); break; + case Value::SET: stream << attribute.set(); break; + case Value::TEXT: stream << attribute.text(); break; + default: + LOG(FATAL) << "Unexpected Value type: " << attribute.type(); + break; + } + + return stream; +} + + +bool Attributes::operator == (const Attributes& that) const +{ + if (size() != that.size()) { + return false; + } + + foreach (const Attribute& attribute, attributes) { + Option<Attribute> maybeAttribute = that.get(attribute); + if (maybeAttribute.isNone()) { + return false; + } + const Attribute& thatAttribute = maybeAttribute.get(); + switch (attribute.type()) { + case Value::SCALAR: + if (!(attribute.scalar() == thatAttribute.scalar())) { + return false; + } + break; + case Value::RANGES: + if (!(attribute.ranges() == thatAttribute.ranges())) { + return false; + } + break; + case Value::TEXT: + if (!(attribute.text() == thatAttribute.text())) { + return false; + } + break; + case Value::SET: + LOG(FATAL) << "Sets not supported for attributes"; + } + } + + return true; +} + + +const Option<Attribute> Attributes::get(const Attribute& thatAttribute) const +{ + foreach (const Attribute& attribute, attributes) { + if (attribute.name() == thatAttribute.name() && + attribute.type() == thatAttribute.type()) { + return attribute; + } + } + + return None(); +} + + +Attribute Attributes::parse(const string& name, const string& text) +{ + Attribute attribute; + Try<Value> result = internal::values::parse(text); + + if (result.isError()) { + LOG(FATAL) << "Failed to parse attribute " << name + << " text " << text + << " error " << result.error(); + } else { + Value value = result.get(); + attribute.set_name(name); + + if (value.type() == Value::RANGES) { + attribute.set_type(Value::RANGES); + attribute.mutable_ranges()->MergeFrom(value.ranges()); + } else if (value.type() == Value::TEXT) { + attribute.set_type(Value::TEXT); + attribute.mutable_text()->MergeFrom(value.text()); + } else if (value.type() == Value::SCALAR) { + attribute.set_type(Value::SCALAR); + attribute.mutable_scalar()->MergeFrom(value.scalar()); + } else { + LOG(FATAL) << "Bad type for attribute " << name + << " text " << text + << " type " << value.type(); + } + } + + return attribute; +} + + +Attributes Attributes::parse(const string& s) +{ + // Tokenize and parse the value of "attributes". + Attributes attributes; + + vector<string> tokens = strings::tokenize(s, ";\n"); + + for (size_t i = 0; i < tokens.size(); i++) { + const vector<string>& pairs = strings::split(tokens[i], ":", 2); + if (pairs.size() != 2 || pairs[0].empty() || pairs[1].empty()) { + LOG(FATAL) << "Invalid attribute key:value pair '" << tokens[i] << "'"; + } + + attributes.add(parse(pairs[0], pairs[1])); + } + + return attributes; +} + + +bool Attributes::isValid(const Attribute& attribute) +{ + if (!attribute.has_name() || + attribute.name() == "" || + !attribute.has_type() || + !Value::Type_IsValid(attribute.type())) { + return false; + } + + if (attribute.type() == Value::SCALAR) { + return attribute.has_scalar(); + } else if (attribute.type() == Value::RANGES) { + return attribute.has_ranges(); + } else if (attribute.type() == Value::TEXT) { + return attribute.has_text(); + } else if (attribute.type() == Value::SET) { + // Attributes doesn't support set. + return false; + } + + return false; +} + + +template <> +Value::Scalar Attributes::get( + const string& name, + const Value::Scalar& scalar) const +{ + foreach (const Attribute& attribute, attributes) { + if (attribute.name() == name && + attribute.type() == Value::SCALAR) { + return attribute.scalar(); + } + } + + return scalar; +} + + +template <> +Value::Ranges Attributes::get( + const string& name, + const Value::Ranges& ranges) const +{ + foreach (const Attribute& attribute, attributes) { + if (attribute.name() == name && + attribute.type() == Value::RANGES) { + return attribute.ranges(); + } + } + + return ranges; +} + + +template <> +Value::Text Attributes::get( + const string& name, + const Value::Text& text) const +{ + foreach (const Attribute& attribute, attributes) { + if (attribute.name() == name && + attribute.type() == Value::TEXT) { + return attribute.text(); + } + } + + return text; +} + +} // namespace v1 { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/v1/mesos.cpp ---------------------------------------------------------------------- diff --git a/src/v1/mesos.cpp b/src/v1/mesos.cpp new file mode 100644 index 0000000..e42d04c --- /dev/null +++ b/src/v1/mesos.cpp @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <mesos/v1/attributes.hpp> +#include <mesos/v1/mesos.hpp> +#include <mesos/v1/resources.hpp> + +namespace mesos { +namespace v1 { + +// TODO(vinod): Ensure that these operators do not go out of sync +// when new fields are added to the protobufs (MESOS-2487). + +bool operator == (const CommandInfo& left, const CommandInfo& right) +{ + if (left.uris().size() != right.uris().size()) { + return false; + } + + // TODO(vinod): Factor out the comparison for repeated fields. + for (int i = 0; i < left.uris().size(); i++) { + bool found = false; + for (int j = 0; j < right.uris().size(); j++) { + if (left.uris().Get(i) == right.uris().Get(j)) { + found = true; + break; + } + } + if (!found) { + return false; + } + } + + if (left.arguments().size() != right.arguments().size()) { + return false; + } + + // The order of argv is important. + for (int i = 0; i < left.arguments().size(); i++) { + if (left.arguments().Get(i) != right.arguments().Get(i)) { + return false; + } + } + + // NOTE: We are not validating CommandInfo::ContainerInfo here + // because it is being deprecated in favor of ContainerInfo. + // TODO(vinod): Kill the above comment when + // CommandInfo::ContainerInfo is removed. + return left.environment() == right.environment() && + left.value() == right.value() && + left.user() == right.user() && + left.shell() == right.shell(); +} + + +bool operator == (const CommandInfo::URI& left, const CommandInfo::URI& right) +{ + return left.value() == right.value() && + left.executable() == right.executable() && + left.extract() == right.extract(); +} + + +bool operator == (const Credential& left, const Credential& right) +{ + return left.principal() == right.principal() && + left.secret() == right.secret(); +} + + +bool operator == ( + const Environment::Variable& left, + const Environment::Variable& right) +{ + return left.name() == right.name() && left.value() == right.value(); +} + + +bool operator == (const Environment& left, const Environment& right) +{ + // Order of variables is not important. + if (left.variables().size() != right.variables().size()) { + return false; + } + + for (int i = 0; i < left.variables().size(); i++) { + bool found = false; + for (int j = 0; j < right.variables().size(); j++) { + if (left.variables().Get(i) == right.variables().Get(j)) { + found = true; + break; + } + } + if (!found) { + return false; + } + } + + return true; +} + + +bool operator == (const Volume& left, const Volume& right) +{ + return left.container_path() == right.container_path() && + left.host_path() == right.host_path() && + left.mode() == right.mode(); +} + + +// TODO(bmahler): Leverage process::http::URL for equality. +bool operator == (const URL& left, const URL& right) +{ + return left.SerializeAsString() == right.SerializeAsString(); +} + + +bool operator == ( + const ContainerInfo::DockerInfo::PortMapping& left, + const ContainerInfo::DockerInfo::PortMapping& right) +{ + return left.host_port() == right.host_port() && + left.container_port() == right.container_port() && + left.protocol() == right.protocol(); +} + + +bool operator == (const Parameter& left, const Parameter& right) +{ + return left.key() == right.key() && left.value() == right.value(); +} + + +bool operator == ( + const ContainerInfo::DockerInfo& left, + const ContainerInfo::DockerInfo& right) +{ + // Order of port mappings is not important. + if (left.port_mappings().size() != right.port_mappings().size()) { + return false; + } + + for (int i = 0; i < left.port_mappings().size(); i++) { + bool found = false; + for (int j = 0; j < right.port_mappings().size(); j++) { + if (left.port_mappings().Get(i) == right.port_mappings().Get(j)) { + found = true; + break; + } + } + if (!found) { + return false; + } + } + + // Order of parameters is not important. + if (left.parameters().size() != right.parameters().size()) { + return false; + } + + for (int i = 0; i < left.parameters().size(); i++) { + bool found = false; + for (int j = 0; j < right.parameters().size(); j++) { + if (left.parameters().Get(i) == right.parameters().Get(j)) { + found = true; + break; + } + } + if (!found) { + return false; + } + } + + return left.image() == right.image() && + left.network() == right.network() && + left.privileged() == right.privileged() && + left.force_pull_image() == right.force_pull_image(); +} + + +bool operator == (const ContainerInfo& left, const ContainerInfo& right) +{ + // Order of volumes is not important. + if (left.volumes().size() != right.volumes().size()) { + return false; + } + + for (int i = 0; i < left.volumes().size(); i++) { + bool found = false; + for (int j = 0; j < right.volumes().size(); j++) { + if (left.volumes().Get(i) == right.volumes().Get(j)) { + found = true; + break; + } + } + if (!found) { + return false; + } + } + + return left.type() == right.type() && + left.hostname() == right.hostname() && + left.docker() == right.docker(); +} + + +bool operator == (const Port& left, const Port& right) +{ + return left.number() == right.number() && + left.name() == right.name() && + left.protocol() == right.protocol(); +} + + +bool operator == (const Ports& left, const Ports& right) +{ + // Order of ports is not important. + if (left.ports().size() != right.ports().size()) { + return false; + } + + for (int i = 0; i < left.ports().size(); i++) { + bool found = false; + for (int j = 0; j < right.ports().size(); j++) { + if (left.ports().Get(i) == right.ports().Get(j)) { + found = true; + break; + } + } + if (!found) { + return false; + } + } + + return true; +} + + +bool operator == (const Label& left, const Label& right) +{ + return left.key() == right.key() && left.value() == right.value(); +} + + +bool operator == (const Labels& left, const Labels& right) +{ + // Order of labels is not important. + if (left.labels().size() != right.labels().size()) { + return false; + } + + for (int i = 0; i < left.labels().size(); i++) { + bool found = false; + for (int j = 0; j < right.labels().size(); j++) { + if (left.labels().Get(i) == right.labels().Get(j)) { + found = true; + break; + } + } + if (!found) { + return false; + } + } + + return true; +} + + +bool operator == (const DiscoveryInfo& left, const DiscoveryInfo& right) +{ + return left.visibility() == right.visibility() && + left.name() == right.name() && + left.environment() == right.environment() && + left.location() == right.location() && + left.version() == right.version() && + left.ports() == right.ports() && + left.labels() == right.labels(); +} + + +bool operator == (const ExecutorInfo& left, const ExecutorInfo& right) +{ + return left.executor_id() == right.executor_id() && + left.data() == right.data() && + Resources(left.resources()) == Resources(right.resources()) && + left.command() == right.command() && + left.framework_id() == right.framework_id() && + left.name() == right.name() && + left.source() == right.source() && + left.container() == right.container() && + left.discovery() == right.discovery(); +} + + +bool operator == (const MasterInfo& left, const MasterInfo& right) +{ + return left.id() == right.id() && + left.ip() == right.ip() && + left.port() == right.port() && + left.pid() == right.pid() && + left.hostname() == right.hostname() && + left.version() == right.version(); +} + + +bool operator == ( + const ResourceStatistics& left, + const ResourceStatistics& right) +{ + return left.SerializeAsString() == right.SerializeAsString(); +} + + +bool operator == (const AgentInfo& left, const AgentInfo& right) +{ + return left.hostname() == right.hostname() && + Resources(left.resources()) == Resources(right.resources()) && + Attributes(left.attributes()) == Attributes(right.attributes()) && + left.id() == right.id() && + left.port() == right.port(); +} + + +// TODO(bmahler): Use SerializeToString here? +bool operator == (const TaskStatus& left, const TaskStatus& right) +{ + return left.task_id() == right.task_id() && + left.state() == right.state() && + left.data() == right.data() && + left.message() == right.message() && + left.agent_id() == right.agent_id() && + left.timestamp() == right.timestamp() && + left.executor_id() == right.executor_id() && + left.healthy() == right.healthy() && + left.source() == right.source() && + left.reason() == right.reason() && + left.uuid() == right.uuid(); +} + + +bool operator != (const TaskStatus& left, const TaskStatus& right) +{ + return !(left == right); +} + + +} // namespace v1 { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/b923cb52/src/v1/resources.cpp ---------------------------------------------------------------------- diff --git a/src/v1/resources.cpp b/src/v1/resources.cpp new file mode 100644 index 0000000..fd8df51 --- /dev/null +++ b/src/v1/resources.cpp @@ -0,0 +1,1264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdint.h> + +#include <set> +#include <string> +#include <vector> + +#include <glog/logging.h> + +#include <mesos/v1/mesos.hpp> +#include <mesos/v1/resources.hpp> +#include <mesos/v1/values.hpp> + +#include <stout/foreach.hpp> +#include <stout/hashmap.hpp> +#include <stout/lambda.hpp> +#include <stout/strings.hpp> + +using std::map; +using std::ostream; +using std::set; +using std::string; +using std::vector; + + +namespace mesos { +namespace v1 { + +///////////////////////////////////////////////// +// Helper functions. +///////////////////////////////////////////////// + +bool operator == ( + const Resource::ReservationInfo& left, + const Resource::ReservationInfo& right) +{ + return left.principal() == right.principal(); +} + + +bool operator != ( + const Resource::ReservationInfo& left, + const Resource::ReservationInfo& right) +{ + return !(left == right); +} + + +bool operator == ( + const Resource::DiskInfo& left, + const Resource::DiskInfo& right) +{ + // NOTE: We ignore 'volume' inside DiskInfo when doing comparison + // because it describes how this resource will be used which has + // nothing to do with the Resource object itself. A framework can + // use this resource and specify different 'volume' every time it + // uses it. + if (left.has_persistence() != right.has_persistence()) { + return false; + } + + if (left.has_persistence()) { + return left.persistence().id() == right.persistence().id(); + } + + return true; +} + + +bool operator != ( + const Resource::DiskInfo& left, + const Resource::DiskInfo& right) +{ + return !(left == right); +} + + +bool operator == (const Resource& left, const Resource& right) +{ + if (left.name() != right.name() || + left.type() != right.type() || + left.role() != right.role()) { + return false; + } + + // Check ReservationInfo. + if (left.has_reservation() != right.has_reservation()) { + return false; + } + + if (left.has_reservation() && left.reservation() != right.reservation()) { + return false; + } + + // Check DiskInfo. + if (left.has_disk() != right.has_disk()) { + return false; + } + + if (left.has_disk() && left.disk() != right.disk()) { + return false; + } + + // Check RevocableInfo. + if (left.has_revocable() != right.has_revocable()) { + return false; + } + + if (left.type() == Value::SCALAR) { + return left.scalar() == right.scalar(); + } else if (left.type() == Value::RANGES) { + return left.ranges() == right.ranges(); + } else if (left.type() == Value::SET) { + return left.set() == right.set(); + } else { + return false; + } +} + + +bool operator != (const Resource& left, const Resource& right) +{ + return !(left == right); +} + + +namespace internal { + +// Tests if we can add two Resource objects together resulting in one +// valid Resource object. For example, two Resource objects with +// different name, type or role are not addable. +static bool addable(const Resource& left, const Resource& right) +{ + if (left.name() != right.name() || + left.type() != right.type() || + left.role() != right.role()) { + return false; + } + + // Check ReservationInfo. + if (left.has_reservation() != right.has_reservation()) { + return false; + } + + if (left.has_reservation() && left.reservation() != right.reservation()) { + return false; + } + + // Check DiskInfo. + if (left.has_disk() != right.has_disk()) { + return false; + } + + if (left.has_disk() && left.disk() != right.disk()) { + return false; + } + + // TODO(jieyu): Even if two Resource objects with DiskInfo have the + // same persistence ID, they cannot be added together. In fact, this + // shouldn't happen if we do not add resources from different + // namespaces (e.g., across slave). Consider adding a warning. + if (left.has_disk() && left.disk().has_persistence()) { + return false; + } + + // Check RevocableInfo. + if (left.has_revocable() != right.has_revocable()) { + return false; + } + + return true; +} + + +// Tests if we can subtract "right" from "left" resulting in one valid +// Resource object. For example, two Resource objects with different +// name, type or role are not subtractable. +// NOTE: Set subtraction is always well defined, it does not require +// 'right' to be contained within 'left'. For example, assuming that +// "left = {1, 2}" and "right = {2, 3}", "left" and "right" are +// subtractable because "left - right = {1}". However, "left" does not +// contain "right". +static bool subtractable(const Resource& left, const Resource& right) +{ + if (left.name() != right.name() || + left.type() != right.type() || + left.role() != right.role()) { + return false; + } + + // Check ReservationInfo. + if (left.has_reservation() != right.has_reservation()) { + return false; + } + + if (left.has_reservation() && left.reservation() != right.reservation()) { + return false; + } + + // Check DiskInfo. + if (left.has_disk() != right.has_disk()) { + return false; + } + + if (left.has_disk() && left.disk() != right.disk()) { + return false; + } + + // NOTE: For Resource objects that have DiskInfo, we can only do + // subtraction if they are equal. + if (left.has_disk() && left.disk().has_persistence() && left != right) { + return false; + } + + // Check RevocableInfo. + if (left.has_revocable() != right.has_revocable()) { + return false; + } + + return true; +} + + +// Tests if "right" is contained in "left". +static bool contains(const Resource& left, const Resource& right) +{ + // NOTE: This is a necessary condition for 'contains'. + // 'subtractable' will verify name, role, type, ReservationInfo, + // DiskInfo and RevocableInfo compatibility. + if (!subtractable(left, right)) { + return false; + } + + if (left.type() == Value::SCALAR) { + return right.scalar() <= left.scalar(); + } else if (left.type() == Value::RANGES) { + return right.ranges() <= left.ranges(); + } else if (left.type() == Value::SET) { + return right.set() <= left.set(); + } else { + return false; + } +} + +} // namespace internal { + + +Resource& operator += (Resource& left, const Resource& right) +{ + if (left.type() == Value::SCALAR) { + *left.mutable_scalar() += right.scalar(); + } else if (left.type() == Value::RANGES) { + *left.mutable_ranges() += right.ranges(); + } else if (left.type() == Value::SET) { + *left.mutable_set() += right.set(); + } + + return left; +} + + +Resource operator + (const Resource& left, const Resource& right) +{ + Resource result = left; + result += right; + return result; +} + + +Resource& operator -= (Resource& left, const Resource& right) +{ + if (left.type() == Value::SCALAR) { + *left.mutable_scalar() -= right.scalar(); + } else if (left.type() == Value::RANGES) { + *left.mutable_ranges() -= right.ranges(); + } else if (left.type() == Value::SET) { + *left.mutable_set() -= right.set(); + } + + return left; +} + + +Resource operator - (const Resource& left, const Resource& right) +{ + Resource result = left; + result -= right; + return result; +} + + +///////////////////////////////////////////////// +// Public static functions. +///////////////////////////////////////////////// + + +Try<Resource> Resources::parse( + const string& name, + const string& value, + const string& role) +{ + Try<Value> result = internal::values::parse(value); + if (result.isError()) { + return Error( + "Failed to parse resource " + name + + " value " + value + " error " + result.error()); + } + + Resource resource; + + Value _value = result.get(); + resource.set_name(name); + resource.set_role(role); + + if (_value.type() == Value::SCALAR) { + resource.set_type(Value::SCALAR); + resource.mutable_scalar()->CopyFrom(_value.scalar()); + } else if (_value.type() == Value::RANGES) { + resource.set_type(Value::RANGES); + resource.mutable_ranges()->CopyFrom(_value.ranges()); + } else if (_value.type() == Value::SET) { + resource.set_type(Value::SET); + resource.mutable_set()->CopyFrom(_value.set()); + } else { + return Error( + "Bad type for resource " + name + " value " + value + + " type " + Value::Type_Name(_value.type())); + } + + return resource; +} + + +// TODO(wickman) It is possible for Resources::ostream<< to produce +// unparseable resources, i.e. those with +// ReservationInfo/DiskInfo/RevocableInfo. +Try<Resources> Resources::parse( + const string& text, + const string& defaultRole) +{ + Resources resources; + hashmap<string, Value_Type> nameTypes; + + foreach (const string& token, strings::tokenize(text, ";")) { + vector<string> pair = strings::tokenize(token, ":"); + if (pair.size() != 2) { + return Error("Bad value for resources, missing or extra ':' in " + token); + } + + string name; + string role; + size_t openParen = pair[0].find("("); + if (openParen == string::npos) { + name = strings::trim(pair[0]); + role = defaultRole; + } else { + size_t closeParen = pair[0].find(")"); + if (closeParen == string::npos || closeParen < openParen) { + return Error( + "Bad value for resources, mismatched parentheses in " + token); + } + + name = strings::trim(pair[0].substr(0, openParen)); + + role = strings::trim(pair[0].substr( + openParen + 1, + closeParen - openParen - 1)); + } + + Try<Resource> resource = Resources::parse(name, pair[1], role); + if (resource.isError()) { + return Error(resource.error()); + } + + if (nameTypes.contains(name) && nameTypes[name] != resource.get().type()) { + return Error( + "Resources with the same name ('" + name + "') but different types " + "are not allowed"); + } else if (!nameTypes.contains(name)) { + nameTypes[name] = resource.get().type(); + } + + resources += resource.get(); + } + + return resources; +} + + +Option<Error> Resources::validate(const Resource& resource) +{ + if (resource.name().empty()) { + return Error("Empty resource name"); + } + + if (!Value::Type_IsValid(resource.type())) { + return Error("Invalid resource type"); + } + + if (resource.type() == Value::SCALAR) { + if (!resource.has_scalar() || + resource.has_ranges() || + resource.has_set()) { + return Error("Invalid scalar resource"); + } + + if (resource.scalar().value() < 0) { + return Error("Invalid scalar resource: value < 0"); + } + } else if (resource.type() == Value::RANGES) { + if (resource.has_scalar() || + !resource.has_ranges() || + resource.has_set()) { + return Error("Invalid ranges resource"); + } + + for (int i = 0; i < resource.ranges().range_size(); i++) { + const Value::Range& range = resource.ranges().range(i); + + // Ensure the range make sense (isn't inverted). + if (range.begin() > range.end()) { + return Error("Invalid ranges resource: begin > end"); + } + + // Ensure ranges don't overlap (but not necessarily coalesced). + for (int j = i + 1; j < resource.ranges().range_size(); j++) { + if (range.begin() <= resource.ranges().range(j).begin() && + resource.ranges().range(j).begin() <= range.end()) { + return Error("Invalid ranges resource: overlapping ranges"); + } + } + } + } else if (resource.type() == Value::SET) { + if (resource.has_scalar() || + resource.has_ranges() || + !resource.has_set()) { + return Error("Invalid set resource"); + } + + for (int i = 0; i < resource.set().item_size(); i++) { + const string& item = resource.set().item(i); + + // Ensure no duplicates. + for (int j = i + 1; j < resource.set().item_size(); j++) { + if (item == resource.set().item(j)) { + return Error("Invalid set resource: duplicated elements"); + } + } + } + } else { + // Resource doesn't support TEXT or other value types. + return Error("Unsupported resource type"); + } + + // Checks for 'disk' resource. + if (resource.has_disk() && resource.name() != "disk") { + return Error( + "DiskInfo should not be set for " + resource.name() + " resource"); + } + + // Checks for the invalid state of (role, reservation) pair. + if (resource.role() == "*" && resource.has_reservation()) { + return Error( + "Invalid reservation: role \"*\" cannot be dynamically reserved"); + } + + return None(); +} + + +Option<Error> Resources::validate( + const google::protobuf::RepeatedPtrField<Resource>& resources) +{ + foreach (const Resource& resource, resources) { + Option<Error> error = validate(resource); + if (error.isSome()) { + return Error( + "Resource '" + stringify(resource) + + "' is invalid: " + error.get().message); + } + } + + return None(); +} + + +bool Resources::isEmpty(const Resource& resource) +{ + if (resource.type() == Value::SCALAR) { + return resource.scalar().value() == 0; + } else if (resource.type() == Value::RANGES) { + return resource.ranges().range_size() == 0; + } else if (resource.type() == Value::SET) { + return resource.set().item_size() == 0; + } else { + return false; + } +} + + +bool Resources::isPersistentVolume(const Resource& resource) +{ + return resource.has_disk() && resource.disk().has_persistence(); +} + + +bool Resources::isReserved( + const Resource& resource, + const Option<std::string>& role) +{ + if (role.isSome()) { + return !isUnreserved(resource) && role.get() == resource.role(); + } else { + return !isUnreserved(resource); + } +} + + +bool Resources::isUnreserved(const Resource& resource) +{ + return resource.role() == "*" && !resource.has_reservation(); +} + + +bool Resources::isDynamicallyReserved(const Resource& resource) +{ + return resource.has_reservation(); +} + + +bool Resources::isRevocable(const Resource& resource) +{ + return resource.has_revocable(); +} + +///////////////////////////////////////////////// +// Public member functions. +///////////////////////////////////////////////// + + +Resources::Resources(const Resource& resource) +{ + // NOTE: Invalid and zero Resource object will be ignored. + *this += resource; +} + + +Resources::Resources(const vector<Resource>& _resources) +{ + foreach (const Resource& resource, _resources) { + // NOTE: Invalid and zero Resource objects will be ignored. + *this += resource; + } +} + + +Resources::Resources( + const google::protobuf::RepeatedPtrField<Resource>& _resources) +{ + foreach (const Resource& resource, _resources) { + // NOTE: Invalid and zero Resource objects will be ignored. + *this += resource; + } +} + + +bool Resources::contains(const Resources& that) const +{ + Resources remaining = *this; + + foreach (const Resource& resource, that.resources) { + // NOTE: We use _contains because Resources only contain valid + // Resource objects, and we don't want the performance hit of the + // validity check. + if (!remaining._contains(resource)) { + return false; + } + + remaining -= resource; + } + + return true; +} + + +bool Resources::contains(const Resource& that) const +{ + // NOTE: We must validate 'that' because invalid resources can lead + // to false positives here (e.g., "cpus:-1" will return true). This + // is because 'contains' assumes resources are valid. + return validate(that).isNone() && _contains(that); +} + + +Resources Resources::filter( + const lambda::function<bool(const Resource&)>& predicate) const +{ + Resources result; + foreach (const Resource& resource, resources) { + if (predicate(resource)) { + result += resource; + } + } + return result; +} + + +hashmap<string, Resources> Resources::reserved() const +{ + hashmap<string, Resources> result; + + foreach (const Resource& resource, resources) { + if (isReserved(resource)) { + result[resource.role()] += resource; + } + } + + return result; +} + + +Resources Resources::reserved(const string& role) const +{ + return filter(lambda::bind(isReserved, lambda::_1, role)); +} + + +Resources Resources::unreserved() const +{ + return filter(isUnreserved); +} + + +Resources Resources::persistentVolumes() const +{ + return filter(isPersistentVolume); +} + + +Resources Resources::revocable() const +{ + return filter(isRevocable); +} + + +Resources Resources::flatten( + const string& role, + const Option<Resource::ReservationInfo>& reservation) const +{ + Resources flattened; + + foreach (Resource resource, resources) { + resource.set_role(role); + if (reservation.isNone()) { + resource.clear_reservation(); + } else { + resource.mutable_reservation()->CopyFrom(reservation.get()); + } + flattened += resource; + } + + return flattened; +} + + +// A predicate that returns true for any resource. +static bool any(const Resource&) { return true; } + + +Option<Resources> Resources::find(const Resource& target) const +{ + Resources found; + Resources total = *this; + Resources remaining = Resources(target).flatten(); + + // First look in the target role, then unreserved, then any remaining role. + // TODO(mpark): Use a lambda for 'any' instead once we get full C++11. + vector<lambda::function<bool(const Resource&)>> predicates = { + lambda::bind(isReserved, lambda::_1, target.role()), + isUnreserved, + any + }; + + foreach (const auto& predicate, predicates) { + foreach (const Resource& resource, total.filter(predicate)) { + // Need to flatten to ignore the roles in contains(). + Resources flattened = Resources(resource).flatten(); + + if (flattened.contains(remaining)) { + // Done! + if (!resource.has_reservation()) { + return found + remaining.flatten(resource.role()); + } else { + return found + + remaining.flatten(resource.role(), resource.reservation()); + } + } else if (remaining.contains(flattened)) { + found += resource; + total -= resource; + remaining -= flattened; + break; + } + } + } + + return None(); +} + + +Option<Resources> Resources::find(const Resources& targets) const +{ + Resources total; + + foreach (const Resource& target, targets) { + Option<Resources> found = find(target); + + // Each target needs to be found! + if (found.isNone()) { + return None(); + } + + total += found.get(); + } + + return total; +} + + +Try<Resources> Resources::apply(const Offer::Operation& operation) const +{ + Resources result = *this; + + switch (operation.type()) { + case Offer::Operation::LAUNCH: + // Launch operation does not alter the offered resources. + break; + + case Offer::Operation::RESERVE: { + Option<Error> error = validate(operation.reserve().resources()); + if (error.isSome()) { + return Error("Invalid RESERVE Operation: " + error.get().message); + } + + foreach (const Resource& reserved, operation.reserve().resources()) { + if (!Resources::isReserved(reserved)) { + return Error("Invalid RESERVE Operation: Resource must be reserved"); + } else if (!reserved.has_reservation()) { + return Error("Invalid RESERVE Operation: Missing 'reservation'"); + } + + Resources unreserved = Resources(reserved).flatten(); + + if (!result.contains(unreserved)) { + return Error("Invalid RESERVE Operation: " + stringify(result) + + " does not contain " + stringify(unreserved)); + } + + result -= unreserved; + result += reserved; + } + break; + } + + case Offer::Operation::UNRESERVE: { + Option<Error> error = validate(operation.unreserve().resources()); + if (error.isSome()) { + return Error("Invalid UNRESERVE Operation: " + error.get().message); + } + + foreach (const Resource& reserved, operation.unreserve().resources()) { + if (!Resources::isReserved(reserved)) { + return Error("Invalid UNRESERVE Operation: Resource is not reserved"); + } else if (!reserved.has_reservation()) { + return Error("Invalid UNRESERVE Operation: Missing 'reservation'"); + } + + if (!result.contains(reserved)) { + return Error("Invalid UNRESERVE Operation: " + stringify(result) + + " does not contain " + stringify(reserved)); + } + + Resources unreserved = Resources(reserved).flatten(); + + result -= reserved; + result += unreserved; + } + break; + } + + case Offer::Operation::CREATE: { + Option<Error> error = validate(operation.create().volumes()); + if (error.isSome()) { + return Error("Invalid CREATE Operation: " + error.get().message); + } + + foreach (const Resource& volume, operation.create().volumes()) { + if (!volume.has_disk()) { + return Error("Invalid CREATE Operation: Missing 'disk'"); + } else if (!volume.disk().has_persistence()) { + return Error("Invalid CREATE Operation: Missing 'persistence'"); + } + + // Strip the disk info so that we can subtract it from the + // original resources. + // TODO(jieyu): Non-persistent volumes are not supported for + // now. Persistent volumes can only be be created from regular + // disk resources. Revisit this once we start to support + // non-persistent volumes. + Resource stripped = volume; + stripped.clear_disk(); + + if (!result.contains(stripped)) { + return Error("Invalid CREATE Operation: Insufficient disk resources"); + } + + result -= stripped; + result += volume; + } + break; + } + + case Offer::Operation::DESTROY: { + Option<Error> error = validate(operation.destroy().volumes()); + if (error.isSome()) { + return Error("Invalid DESTROY Operation: " + error.get().message); + } + + foreach (const Resource& volume, operation.destroy().volumes()) { + if (!volume.has_disk()) { + return Error("Invalid DESTROY Operation: Missing 'disk'"); + } else if (!volume.disk().has_persistence()) { + return Error("Invalid DESTROY Operation: Missing 'persistence'"); + } + + if (!result.contains(volume)) { + return Error( + "Invalid DESTROY Operation: Persistent volume does not exist"); + } + + Resource stripped = volume; + stripped.clear_disk(); + + result -= volume; + result += stripped; + } + break; + } + + default: + return Error("Unknown offer operation " + stringify(operation.type())); + } + + // This is a sanity check to ensure the amount of each type of + // resource does not change. + // TODO(jieyu): Currently, we only check known resource types like + // cpus, mem, disk, ports, etc. We should generalize this. + CHECK(result.cpus() == cpus() && + result.mem() == mem() && + result.disk() == disk() && + result.ports() == ports()); + + return result; +} + + +template <> +Option<Value::Scalar> Resources::get(const string& name) const +{ + Value::Scalar total; + bool found = false; + + foreach (const Resource& resource, resources) { + if (resource.name() == name && + resource.type() == Value::SCALAR) { + total += resource.scalar(); + found = true; + } + } + + if (found) { + return total; + } + + return None(); +} + + +template <> +Option<Value::Set> Resources::get(const string& name) const +{ + Value::Set total; + bool found = false; + + foreach (const Resource& resource, resources) { + if (resource.name() == name && + resource.type() == Value::SET) { + total += resource.set(); + found = true; + } + } + + if (found) { + return total; + } + + return None(); +} + + +template <> +Option<Value::Ranges> Resources::get(const string& name) const +{ + Value::Ranges total; + bool found = false; + + foreach (const Resource& resource, resources) { + if (resource.name() == name && + resource.type() == Value::RANGES) { + total += resource.ranges(); + found = true; + } + } + + if (found) { + return total; + } + + return None(); +} + + +Resources Resources::get(const string& name) const +{ + return filter([=](const Resource& resource) { + return resource.name() == name; + }); +} + + +Resources Resources::scalars() const +{ + return filter([=](const Resource& resource) { + return resource.type() == Value::SCALAR; + }); +} + + +set<string> Resources::names() const +{ + set<string> result; + foreach(const Resource& resource, resources) { + result.insert(resource.name()); + } + + return result; +} + + +map<string, Value_Type> Resources::types() const +{ + map<string, Value_Type> result; + foreach(const Resource& resource, resources) { + result[resource.name()] = resource.type(); + } + + return result; +} + + +Option<double> Resources::cpus() const +{ + Option<Value::Scalar> value = get<Value::Scalar>("cpus"); + if (value.isSome()) { + return value.get().value(); + } else { + return None(); + } +} + + +Option<Bytes> Resources::mem() const +{ + Option<Value::Scalar> value = get<Value::Scalar>("mem"); + if (value.isSome()) { + return Megabytes(static_cast<uint64_t>(value.get().value())); + } else { + return None(); + } +} + + +Option<Bytes> Resources::disk() const +{ + Option<Value::Scalar> value = get<Value::Scalar>("disk"); + if (value.isSome()) { + return Megabytes(static_cast<uint64_t>(value.get().value())); + } else { + return None(); + } +} + + +Option<Value::Ranges> Resources::ports() const +{ + Option<Value::Ranges> value = get<Value::Ranges>("ports"); + if (value.isSome()) { + return value.get(); + } else { + return None(); + } +} + + +Option<Value::Ranges> Resources::ephemeral_ports() const +{ + Option<Value::Ranges> value = get<Value::Ranges>("ephemeral_ports"); + if (value.isSome()) { + return value.get(); + } else { + return None(); + } +} + + +bool Resources::_contains(const Resource& that) const +{ + foreach (const Resource& resource, resources) { + if (internal::contains(resource, that)) { + return true; + } + } + + return false; +} + + +///////////////////////////////////////////////// +// Overloaded operators. +///////////////////////////////////////////////// + + +Resources::operator const google::protobuf::RepeatedPtrField<Resource>& () const +{ + return resources; +} + + +bool Resources::operator == (const Resources& that) const +{ + return this->contains(that) && that.contains(*this); +} + + +bool Resources::operator != (const Resources& that) const +{ + return !(*this == that); +} + + +Resources Resources::operator + (const Resource& that) const +{ + Resources result = *this; + result += that; + return result; +} + + +Resources Resources::operator + (const Resources& that) const +{ + Resources result = *this; + result += that; + return result; +} + + +Resources& Resources::operator += (const Resource& that) +{ + if (validate(that).isNone() && !isEmpty(that)) { + bool found = false; + foreach (Resource& resource, resources) { + if (internal::addable(resource, that)) { + resource += that; + found = true; + break; + } + } + + // Cannot be combined with any existing Resource object. + if (!found) { + resources.Add()->CopyFrom(that); + } + } + + return *this; +} + + +Resources& Resources::operator += (const Resources& that) +{ + foreach (const Resource& resource, that.resources) { + *this += resource; + } + + return *this; +} + + +Resources Resources::operator - (const Resource& that) const +{ + Resources result = *this; + result -= that; + return result; +} + + +Resources Resources::operator - (const Resources& that) const +{ + Resources result = *this; + result -= that; + return result; +} + + +Resources& Resources::operator -= (const Resource& that) +{ + if (validate(that).isNone() && !isEmpty(that)) { + for (int i = 0; i < resources.size(); i++) { + Resource* resource = resources.Mutable(i); + + if (internal::subtractable(*resource, that)) { + *resource -= that; + + // Remove the resource if it becomes invalid or zero. We need + // to do the validation because we want to strip negative + // scalar Resource object. + if (validate(*resource).isSome() || isEmpty(*resource)) { + resources.DeleteSubrange(i, 1); + } + + break; + } + } + } + + return *this; +} + + +Resources& Resources::operator -= (const Resources& that) +{ + foreach (const Resource& resource, that.resources) { + *this -= resource; + } + + return *this; +} + + +ostream& operator << (ostream& stream, const Volume& volume) { + string volumeConfig = volume.container_path(); + + if (volume.has_host_path()) { + volumeConfig = volume.host_path() + ":" + volumeConfig; + + if (volume.has_mode()) { + switch (volume.mode()) { + case Volume::RW: volumeConfig += ":rw"; break; + case Volume::RO: volumeConfig += ":ro"; break; + default: + LOG(FATAL) << "Unknown Volume mode: " << volume.mode(); + break; + } + } + } + + stream << volumeConfig; + + return stream; +} + + +ostream& operator << (ostream& stream, const Resource::DiskInfo& disk) { + if (disk.has_persistence()) { + stream << disk.persistence().id(); + } + + if (disk.has_volume()) { + stream << ":" << disk.volume(); + } + + return stream; +} + + +ostream& operator << (ostream& stream, const Resource& resource) +{ + stream << resource.name(); + + stream << "(" << resource.role(); + + if (resource.has_reservation()) { + stream << ", " << resource.reservation().principal(); + } + + stream << ")"; + + if (resource.has_disk()) { + stream << "[" << resource.disk() << "]"; + } + + // Once extended revocable attributes are available, change this to a more + // meaningful value. + if (resource.has_revocable()) { + stream << "{REV}"; + } + + stream << ":"; + + switch (resource.type()) { + case Value::SCALAR: stream << resource.scalar(); break; + case Value::RANGES: stream << resource.ranges(); break; + case Value::SET: stream << resource.set(); break; + default: + LOG(FATAL) << "Unexpected Value type: " << resource.type(); + break; + } + + return stream; +} + + +ostream& operator << (ostream& stream, const Resources& resources) +{ + Resources::const_iterator it = resources.begin(); + + while (it != resources.end()) { + stream << *it; + if (++it != resources.end()) { + stream << "; "; + } + } + + return stream; +} + +} // namespace v1 { +} // namespace mesos {
