Renamed low level scheduler libprocess test to event call framework test. Also deleted the low level scheduler pthread test.
Review: https://reviews.apache.org/r/36077 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6e168073 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6e168073 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6e168073 Branch: refs/heads/master Commit: 6e16807359d0f1ed04fb74556bba8230051d10f5 Parents: 5d06b3d Author: Vinod Kone <[email protected]> Authored: Tue Jun 30 19:14:20 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Wed Jul 1 17:54:57 2015 -0700 ---------------------------------------------------------------------- src/Makefile.am | 16 +- src/examples/event_call_framework.cpp | 438 ++++++++++++++++ src/examples/low_level_scheduler_libprocess.cpp | 438 ---------------- src/examples/low_level_scheduler_pthread.cpp | 495 ------------------- src/tests/event_call_framework_test.sh | 31 ++ src/tests/examples_tests.cpp | 5 +- .../low_level_scheduler_libprocess_test.sh | 31 -- src/tests/low_level_scheduler_pthread_test.sh | 31 -- 8 files changed, 475 insertions(+), 1010 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/6e168073/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index a064d17..addb63f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1311,15 +1311,10 @@ clean-python: PHONY_TARGETS += clean-python # Test (make check) binaries. -check_PROGRAMS += low-level-scheduler-libprocess -low_level_scheduler_libprocess_SOURCES = examples/low_level_scheduler_libprocess.cpp -low_level_scheduler_libprocess_CPPFLAGS = $(MESOS_CPPFLAGS) -low_level_scheduler_libprocess_LDADD = libmesos.la $(LDADD) - -check_PROGRAMS += low-level-scheduler-pthread -low_level_scheduler_pthread_SOURCES = examples/low_level_scheduler_pthread.cpp -low_level_scheduler_pthread_CPPFLAGS = $(MESOS_CPPFLAGS) -low_level_scheduler_pthread_LDADD = libmesos.la $(LDADD) +check_PROGRAMS += event-call-framework +event_call_framework_SOURCES = examples/event_call_framework.cpp +event_call_framework_CPPFLAGS = $(MESOS_CPPFLAGS) +event_call_framework_LDADD = libmesos.la $(LDADD) check_PROGRAMS += test-framework test_framework_SOURCES = examples/test_framework.cpp @@ -1601,8 +1596,7 @@ EXTRA_DIST += examples/python/test_containerizer.py \ dist_check_SCRIPTS += \ tests/balloon_framework_test.sh \ - tests/low_level_scheduler_libprocess_test.sh \ - tests/low_level_scheduler_pthread_test.sh \ + tests/event_call_framework_test.sh \ tests/java_exception_test.sh \ tests/java_framework_test.sh \ tests/java_log_test.sh \ http://git-wip-us.apache.org/repos/asf/mesos/blob/6e168073/src/examples/event_call_framework.cpp ---------------------------------------------------------------------- diff --git a/src/examples/event_call_framework.cpp b/src/examples/event_call_framework.cpp new file mode 100644 index 0000000..b9de22f --- /dev/null +++ b/src/examples/event_call_framework.cpp @@ -0,0 +1,438 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <iostream> +#include <string> +#include <queue> + +#include <boost/lexical_cast.hpp> + +#include <mesos/resources.hpp> +#include <mesos/scheduler.hpp> +#include <mesos/type_utils.hpp> + +#include <process/delay.hpp> +#include <process/process.hpp> +#include <process/protobuf.hpp> + +#include <stout/check.hpp> +#include <stout/duration.hpp> +#include <stout/exit.hpp> +#include <stout/flags.hpp> +#include <stout/foreach.hpp> +#include <stout/lambda.hpp> +#include <stout/none.hpp> +#include <stout/numify.hpp> +#include <stout/option.hpp> +#include <stout/os.hpp> +#include <stout/option.hpp> +#include <stout/path.hpp> +#include <stout/stringify.hpp> + +#include "common/status_utils.hpp" + +#include "logging/flags.hpp" +#include "logging/logging.hpp" + +using namespace mesos; + +using std::cerr; +using std::cout; +using std::endl; +using std::queue; +using std::string; +using std::vector; + +using boost::lexical_cast; + +using mesos::Resources; +using mesos::scheduler::Call; +using mesos::scheduler::Event; + +const int32_t CPUS_PER_TASK = 1; +const int32_t MEM_PER_TASK = 128; + +class EventCallScheduler : public process::Process<EventCallScheduler> +{ +public: + EventCallScheduler(const FrameworkInfo& _framework, + const ExecutorInfo& _executor, + const string& master) + : framework(_framework), + executor(_executor), + mesos(master, + process::defer(self(), &Self::connected), + process::defer(self(), &Self::disconnected), + process::defer(self(), &Self::received, lambda::_1)), + state(INITIALIZING), + tasksLaunched(0), + tasksFinished(0), + totalTasks(5) {} + + EventCallScheduler(const FrameworkInfo& _framework, + const ExecutorInfo& _executor, + const string& master, + const Credential& credential) + : framework(_framework), + executor(_executor), + mesos(master, + credential, + process::defer(self(), &Self::connected), + process::defer(self(), &Self::disconnected), + process::defer(self(), &Self::received, lambda::_1)), + state(INITIALIZING), + tasksLaunched(0), + tasksFinished(0), + totalTasks(5) {} + + ~EventCallScheduler() {} + + void connected() + { + doReliableRegistration(); + } + + void disconnected() + { + state = DISCONNECTED; + } + + void received(queue<Event> events) + { + while (!events.empty()) { + Event event = events.front(); + events.pop(); + + switch (event.type()) { + case Event::SUBSCRIBED: { + cout << endl << "Received a SUBSCRIBED event" << endl; + + framework.mutable_id()->CopyFrom(event.subscribed().framework_id()); + state = SUBSCRIBED; + + cout << "Subscribed with ID '" << framework.id() << endl; + break; + } + + case Event::OFFERS: { + cout << endl << "Received an OFFERS event" << endl; + resourceOffers(google::protobuf::convert(event.offers().offers())); + break; + } + + case Event::RESCIND: { + cout << endl << "Received a RESCIND event" << endl; + break; + } + + case Event::UPDATE: { + cout << endl << "Received an UPDATE event" << endl; + + // TODO(zuyu): Do batch processing of UPDATE events. + statusUpdate(event.update().status()); + break; + } + + case Event::MESSAGE: { + cout << endl << "Received a MESSAGE event" << endl; + break; + } + + case Event::FAILURE: { + cout << endl << "Received a FAILURE event" << endl; + + if (event.failure().has_executor_id()) { + // Executor failed. + cout << "Executor '" + << event.failure().executor_id().value() << "' terminated"; + + if (event.failure().has_slave_id()) { + cout << " on Slave '" + << event.failure().slave_id().value() << "'"; + } + + if (event.failure().has_status()) { + cout << ", and " << WSTRINGIFY(event.failure().status()); + } + + cout << endl; + } else if (event.failure().has_slave_id()) { + // Slave failed. + cout << "Slave '" << event.failure().slave_id().value() + << "' terminated" << endl; + } + break; + } + + case Event::ERROR: { + cout << endl << "Received an ERROR event: " + << event.error().message() << endl; + process::terminate(self()); + break; + } + + default: { + EXIT(1) << "Received an UNKNOWN event"; + } + } + } + } + +private: + void resourceOffers(const vector<Offer>& offers) + { + foreach (const Offer& offer, offers) { + cout << "Received offer " << offer.id() << " with " << offer.resources() + << endl; + + static const Resources TASK_RESOURCES = Resources::parse( + "cpus:" + stringify(CPUS_PER_TASK) + + ";mem:" + stringify(MEM_PER_TASK)).get(); + + Resources remaining = offer.resources(); + + // Launch tasks. + vector<TaskInfo> tasks; + while (tasksLaunched < totalTasks && + remaining.flatten().contains(TASK_RESOURCES)) { + int taskId = tasksLaunched++; + + cout << "Launching task " << taskId << " using offer " + << offer.id() << endl; + + TaskInfo task; + task.set_name("Task " + lexical_cast<string>(taskId)); + task.mutable_task_id()->set_value( + lexical_cast<string>(taskId)); + task.mutable_slave_id()->MergeFrom(offer.slave_id()); + task.mutable_executor()->MergeFrom(executor); + + Option<Resources> resources = + remaining.find(TASK_RESOURCES.flatten(framework.role())); + + CHECK_SOME(resources); + task.mutable_resources()->MergeFrom(resources.get()); + remaining -= resources.get(); + + tasks.push_back(task); + } + + Call call; + call.mutable_framework_info()->CopyFrom(framework); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + Offer::Operation* operation = accept->add_operations(); + operation->set_type(Offer::Operation::LAUNCH); + foreach (const TaskInfo& taskInfo, tasks) { + operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo); + } + + mesos.send(call); + } + } + + void statusUpdate(const TaskStatus& status) + { + cout << "Task " << status.task_id() << " is in state " << status.state(); + + if (status.has_message()) { + cout << " with message '" << status.message() << "'"; + } + cout << endl; + + if (status.has_uuid()) { + Call call; + call.mutable_framework_info()->CopyFrom(framework); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* ack = call.mutable_acknowledge(); + ack->mutable_slave_id()->CopyFrom(status.slave_id()); + ack->mutable_task_id ()->CopyFrom(status.task_id ()); + ack->set_uuid(status.uuid()); + + mesos.send(call); + } + + if (status.state() == TASK_FINISHED) { + ++tasksFinished; + } + + if (status.state() == TASK_LOST || + status.state() == TASK_KILLED || + status.state() == TASK_FAILED) { + EXIT(1) << "Exiting because task " << status.task_id() + << " is in unexpected state " << status.state() + << " with reason " << status.reason() + << " from source " << status.source() + << " with message '" << status.message() << "'"; + } + + if (tasksFinished == totalTasks) { + process::terminate(self()); + } + } + + void doReliableRegistration() + { + if (state == SUBSCRIBED) { + return; + } + + Call call; + call.mutable_framework_info()->CopyFrom(framework); + call.set_type(Call::SUBSCRIBE); + + mesos.send(call); + + process::delay(Seconds(1), + self(), + &Self::doReliableRegistration); + } + + void finalize() + { + Call call; + call.mutable_framework_info()->CopyFrom(framework); + call.set_type(Call::TEARDOWN); + + mesos.send(call); + } + + FrameworkInfo framework; + const ExecutorInfo executor; + scheduler::Mesos mesos; + + enum State { + INITIALIZING = 0, + SUBSCRIBED = 1, + DISCONNECTED = 2 + } state; + + int tasksLaunched; + int tasksFinished; + const int totalTasks; +}; + + +void usage(const char* argv0, const flags::FlagsBase& flags) +{ + cerr << "Usage: " << Path(argv0).basename() << " [...]" << endl + << endl + << "Supported options:" << endl + << flags.usage(); +} + + +int main(int argc, char** argv) +{ + // Find this executable's directory to locate executor. + string uri; + Option<string> value = os::getenv("MESOS_BUILD_DIR"); + if (value.isSome()) { + uri = path::join(value.get(), "src", "test-executor"); + } else { + uri = path::join( + os::realpath(Path(argv[0]).dirname()).get(), + "src", + "test-executor"); + } + + mesos::internal::logging::Flags flags; + + string role; + flags.add(&role, + "role", + "Role to use when registering", + "*"); + + Option<string> master; + flags.add(&master, + "master", + "ip:port of master to connect"); + + Try<Nothing> load = flags.load(None(), argc, argv); + + if (load.isError()) { + cerr << load.error() << endl; + usage(argv[0], flags); + EXIT(1); + } else if (master.isNone()) { + cerr << "Missing --master" << endl; + usage(argv[0], flags); + EXIT(1); + } + + process::initialize(); + internal::logging::initialize(argv[0], flags, true); // Catch signals. + + FrameworkInfo framework; + framework.set_user(""); // Have Mesos fill in the current user. + framework.set_name("Event Call Scheduler using libprocess (C++)"); + framework.set_role(role); + + value = os::getenv("MESOS_CHECKPOINT"); + if (value.isSome()) { + framework.set_checkpoint( + numify<bool>(value.get()).get()); + } + + ExecutorInfo executor; + executor.mutable_executor_id()->set_value("default"); + executor.mutable_command()->set_value(uri); + executor.set_name("Test Executor (C++)"); + executor.set_source("cpp_test"); + + EventCallScheduler* scheduler; + if (os::getenv("MESOS_AUTHENTICATE").isSome()) { + cout << "Enabling authentication for the scheduler" << endl; + + value = os::getenv("DEFAULT_PRINCIPAL"); + if (value.isNone()) { + EXIT(1) << "Expecting authentication principal in the environment"; + } + + Credential credential; + credential.set_principal(value.get()); + + framework.set_principal(value.get()); + + value = os::getenv("DEFAULT_SECRET"); + if (value.isNone()) { + EXIT(1) << "Expecting authentication secret in the environment"; + } + + credential.set_secret(value.get()); + + scheduler = + new EventCallScheduler(framework, executor, master.get(), credential); + } else { + framework.set_principal("event-call-scheduler-cpp"); + + scheduler = + new EventCallScheduler(framework, executor, master.get()); + } + + process::spawn(scheduler); + process::wait(scheduler); + delete scheduler; + + return EXIT_SUCCESS; +} http://git-wip-us.apache.org/repos/asf/mesos/blob/6e168073/src/examples/low_level_scheduler_libprocess.cpp ---------------------------------------------------------------------- diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp deleted file mode 100644 index 63e42bc..0000000 --- a/src/examples/low_level_scheduler_libprocess.cpp +++ /dev/null @@ -1,438 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <iostream> -#include <string> -#include <queue> - -#include <boost/lexical_cast.hpp> - -#include <mesos/resources.hpp> -#include <mesos/scheduler.hpp> -#include <mesos/type_utils.hpp> - -#include <process/delay.hpp> -#include <process/process.hpp> -#include <process/protobuf.hpp> - -#include <stout/check.hpp> -#include <stout/duration.hpp> -#include <stout/exit.hpp> -#include <stout/flags.hpp> -#include <stout/foreach.hpp> -#include <stout/lambda.hpp> -#include <stout/none.hpp> -#include <stout/numify.hpp> -#include <stout/option.hpp> -#include <stout/os.hpp> -#include <stout/option.hpp> -#include <stout/path.hpp> -#include <stout/stringify.hpp> - -#include "common/status_utils.hpp" - -#include "logging/flags.hpp" -#include "logging/logging.hpp" - -using namespace mesos; - -using std::cerr; -using std::cout; -using std::endl; -using std::queue; -using std::string; -using std::vector; - -using boost::lexical_cast; - -using mesos::Resources; -using mesos::scheduler::Call; -using mesos::scheduler::Event; - -const int32_t CPUS_PER_TASK = 1; -const int32_t MEM_PER_TASK = 128; - -class LowLevelScheduler : public process::Process<LowLevelScheduler> -{ -public: - LowLevelScheduler(const FrameworkInfo& _framework, - const ExecutorInfo& _executor, - const string& master) - : framework(_framework), - executor(_executor), - mesos(master, - process::defer(self(), &Self::connected), - process::defer(self(), &Self::disconnected), - process::defer(self(), &Self::received, lambda::_1)), - state(INITIALIZING), - tasksLaunched(0), - tasksFinished(0), - totalTasks(5) {} - - LowLevelScheduler(const FrameworkInfo& _framework, - const ExecutorInfo& _executor, - const string& master, - const Credential& credential) - : framework(_framework), - executor(_executor), - mesos(master, - credential, - process::defer(self(), &Self::connected), - process::defer(self(), &Self::disconnected), - process::defer(self(), &Self::received, lambda::_1)), - state(INITIALIZING), - tasksLaunched(0), - tasksFinished(0), - totalTasks(5) {} - - ~LowLevelScheduler() {} - - void connected() - { - doReliableRegistration(); - } - - void disconnected() - { - state = DISCONNECTED; - } - - void received(queue<Event> events) - { - while (!events.empty()) { - Event event = events.front(); - events.pop(); - - switch (event.type()) { - case Event::SUBSCRIBED: { - cout << endl << "Received a SUBSCRIBED event" << endl; - - framework.mutable_id()->CopyFrom(event.subscribed().framework_id()); - state = SUBSCRIBED; - - cout << "Subscribed with ID '" << framework.id() << endl; - break; - } - - case Event::OFFERS: { - cout << endl << "Received an OFFERS event" << endl; - resourceOffers(google::protobuf::convert(event.offers().offers())); - break; - } - - case Event::RESCIND: { - cout << endl << "Received a RESCIND event" << endl; - break; - } - - case Event::UPDATE: { - cout << endl << "Received an UPDATE event" << endl; - - // TODO(zuyu): Do batch processing of UPDATE events. - statusUpdate(event.update().status()); - break; - } - - case Event::MESSAGE: { - cout << endl << "Received a MESSAGE event" << endl; - break; - } - - case Event::FAILURE: { - cout << endl << "Received a FAILURE event" << endl; - - if (event.failure().has_executor_id()) { - // Executor failed. - cout << "Executor '" - << event.failure().executor_id().value() << "' terminated"; - - if (event.failure().has_slave_id()) { - cout << " on Slave '" - << event.failure().slave_id().value() << "'"; - } - - if (event.failure().has_status()) { - cout << ", and " << WSTRINGIFY(event.failure().status()); - } - - cout << endl; - } else if (event.failure().has_slave_id()) { - // Slave failed. - cout << "Slave '" << event.failure().slave_id().value() - << "' terminated" << endl; - } - break; - } - - case Event::ERROR: { - cout << endl << "Received an ERROR event: " - << event.error().message() << endl; - process::terminate(self()); - break; - } - - default: { - EXIT(1) << "Received an UNKNOWN event"; - } - } - } - } - -private: - void resourceOffers(const vector<Offer>& offers) - { - foreach (const Offer& offer, offers) { - cout << "Received offer " << offer.id() << " with " << offer.resources() - << endl; - - static const Resources TASK_RESOURCES = Resources::parse( - "cpus:" + stringify(CPUS_PER_TASK) + - ";mem:" + stringify(MEM_PER_TASK)).get(); - - Resources remaining = offer.resources(); - - // Launch tasks. - vector<TaskInfo> tasks; - while (tasksLaunched < totalTasks && - remaining.flatten().contains(TASK_RESOURCES)) { - int taskId = tasksLaunched++; - - cout << "Launching task " << taskId << " using offer " - << offer.id() << endl; - - TaskInfo task; - task.set_name("Task " + lexical_cast<string>(taskId)); - task.mutable_task_id()->set_value( - lexical_cast<string>(taskId)); - task.mutable_slave_id()->MergeFrom(offer.slave_id()); - task.mutable_executor()->MergeFrom(executor); - - Option<Resources> resources = - remaining.find(TASK_RESOURCES.flatten(framework.role())); - - CHECK_SOME(resources); - task.mutable_resources()->MergeFrom(resources.get()); - remaining -= resources.get(); - - tasks.push_back(task); - } - - Call call; - call.mutable_framework_info()->CopyFrom(framework); - call.set_type(Call::ACCEPT); - - Call::Accept* accept = call.mutable_accept(); - accept->add_offer_ids()->CopyFrom(offer.id()); - - Offer::Operation* operation = accept->add_operations(); - operation->set_type(Offer::Operation::LAUNCH); - foreach (const TaskInfo& taskInfo, tasks) { - operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo); - } - - mesos.send(call); - } - } - - void statusUpdate(const TaskStatus& status) - { - cout << "Task " << status.task_id() << " is in state " << status.state(); - - if (status.has_message()) { - cout << " with message '" << status.message() << "'"; - } - cout << endl; - - if (status.has_uuid()) { - Call call; - call.mutable_framework_info()->CopyFrom(framework); - call.set_type(Call::ACKNOWLEDGE); - - Call::Acknowledge* ack = call.mutable_acknowledge(); - ack->mutable_slave_id()->CopyFrom(status.slave_id()); - ack->mutable_task_id ()->CopyFrom(status.task_id ()); - ack->set_uuid(status.uuid()); - - mesos.send(call); - } - - if (status.state() == TASK_FINISHED) { - ++tasksFinished; - } - - if (status.state() == TASK_LOST || - status.state() == TASK_KILLED || - status.state() == TASK_FAILED) { - EXIT(1) << "Exiting because task " << status.task_id() - << " is in unexpected state " << status.state() - << " with reason " << status.reason() - << " from source " << status.source() - << " with message '" << status.message() << "'"; - } - - if (tasksFinished == totalTasks) { - process::terminate(self()); - } - } - - void doReliableRegistration() - { - if (state == SUBSCRIBED) { - return; - } - - Call call; - call.mutable_framework_info()->CopyFrom(framework); - call.set_type(Call::SUBSCRIBE); - - mesos.send(call); - - process::delay(Seconds(1), - self(), - &Self::doReliableRegistration); - } - - void finalize() - { - Call call; - call.mutable_framework_info()->CopyFrom(framework); - call.set_type(Call::TEARDOWN); - - mesos.send(call); - } - - FrameworkInfo framework; - const ExecutorInfo executor; - scheduler::Mesos mesos; - - enum State { - INITIALIZING = 0, - SUBSCRIBED = 1, - DISCONNECTED = 2 - } state; - - int tasksLaunched; - int tasksFinished; - const int totalTasks; -}; - - -void usage(const char* argv0, const flags::FlagsBase& flags) -{ - cerr << "Usage: " << Path(argv0).basename() << " [...]" << endl - << endl - << "Supported options:" << endl - << flags.usage(); -} - - -int main(int argc, char** argv) -{ - // Find this executable's directory to locate executor. - string uri; - Option<string> value = os::getenv("MESOS_BUILD_DIR"); - if (value.isSome()) { - uri = path::join(value.get(), "src", "test-executor"); - } else { - uri = path::join( - os::realpath(Path(argv[0]).dirname()).get(), - "src", - "test-executor"); - } - - mesos::internal::logging::Flags flags; - - string role; - flags.add(&role, - "role", - "Role to use when registering", - "*"); - - Option<string> master; - flags.add(&master, - "master", - "ip:port of master to connect"); - - Try<Nothing> load = flags.load(None(), argc, argv); - - if (load.isError()) { - cerr << load.error() << endl; - usage(argv[0], flags); - EXIT(1); - } else if (master.isNone()) { - cerr << "Missing --master" << endl; - usage(argv[0], flags); - EXIT(1); - } - - process::initialize(); - internal::logging::initialize(argv[0], flags, true); // Catch signals. - - FrameworkInfo framework; - framework.set_user(""); // Have Mesos fill in the current user. - framework.set_name("Low-Level Scheduler using libprocess (C++)"); - framework.set_role(role); - - value = os::getenv("MESOS_CHECKPOINT"); - if (value.isSome()) { - framework.set_checkpoint( - numify<bool>(value.get()).get()); - } - - ExecutorInfo executor; - executor.mutable_executor_id()->set_value("default"); - executor.mutable_command()->set_value(uri); - executor.set_name("Test Executor (C++)"); - executor.set_source("cpp_test"); - - LowLevelScheduler* scheduler; - if (os::getenv("MESOS_AUTHENTICATE").isSome()) { - cout << "Enabling authentication for the scheduler" << endl; - - value = os::getenv("DEFAULT_PRINCIPAL"); - if (value.isNone()) { - EXIT(1) << "Expecting authentication principal in the environment"; - } - - Credential credential; - credential.set_principal(value.get()); - - framework.set_principal(value.get()); - - value = os::getenv("DEFAULT_SECRET"); - if (value.isNone()) { - EXIT(1) << "Expecting authentication secret in the environment"; - } - - credential.set_secret(value.get()); - - scheduler = - new LowLevelScheduler(framework, executor, master.get(), credential); - } else { - framework.set_principal("low-level-scheduler-cpp"); - - scheduler = - new LowLevelScheduler(framework, executor, master.get()); - } - - process::spawn(scheduler); - process::wait(scheduler); - delete scheduler; - - return EXIT_SUCCESS; -} http://git-wip-us.apache.org/repos/asf/mesos/blob/6e168073/src/examples/low_level_scheduler_pthread.cpp ---------------------------------------------------------------------- diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp deleted file mode 100644 index 7630d41..0000000 --- a/src/examples/low_level_scheduler_pthread.cpp +++ /dev/null @@ -1,495 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <pthread.h> -#include <unistd.h> - -#include <iostream> -#include <string> -#include <queue> - -#include <boost/lexical_cast.hpp> - -#include <mesos/resources.hpp> -#include <mesos/scheduler.hpp> -#include <mesos/type_utils.hpp> - -#include <process/protobuf.hpp> - -#include <stout/check.hpp> -#include <stout/exit.hpp> -#include <stout/flags.hpp> -#include <stout/foreach.hpp> -#include <stout/lambda.hpp> -#include <stout/none.hpp> -#include <stout/numify.hpp> -#include <stout/option.hpp> -#include <stout/os.hpp> -#include <stout/path.hpp> -#include <stout/stringify.hpp> - -#include "common/status_utils.hpp" - -#include "logging/flags.hpp" -#include "logging/logging.hpp" - -using namespace mesos; - -using std::cerr; -using std::cout; -using std::endl; -using std::queue; -using std::string; -using std::vector; - -using boost::lexical_cast; - -using mesos::Resources; -using mesos::scheduler::Call; -using mesos::scheduler::Event; - -const int32_t CPUS_PER_TASK = 1; -const int32_t MEM_PER_TASK = 128; - -class LowLevelScheduler -{ -public: - LowLevelScheduler(const FrameworkInfo& _framework, - const ExecutorInfo& _executor, - const string& master) - : framework(_framework), - executor(_executor), - mesos(master, - lambda::bind(&LowLevelScheduler::connected, this), - lambda::bind(&LowLevelScheduler::disconnected, this), - lambda::bind(&LowLevelScheduler::received, this, lambda::_1)), - state(INITIALIZING), - tasksLaunched(0), - tasksFinished(0), - totalTasks(5) - { - pthread_mutex_init(&mutex, NULL); - pthread_cond_init(&cond, 0); - } - - LowLevelScheduler(const FrameworkInfo& _framework, - const ExecutorInfo& _executor, - const string& master, - const Credential& credential) - : framework(_framework), - executor(_executor), - mesos(master, - credential, - lambda::bind(&LowLevelScheduler::connected, this), - lambda::bind(&LowLevelScheduler::disconnected, this), - lambda::bind(&LowLevelScheduler::received, this, lambda::_1)), - state(INITIALIZING), - tasksLaunched(0), - tasksFinished(0), - totalTasks(5) - { - pthread_mutex_init(&mutex, NULL); - pthread_cond_init(&cond, 0); - } - - ~LowLevelScheduler() - { - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); - } - - void connected() - { - pthread_mutex_lock(&mutex); - state = CONNECTED; - - // Unblock the main thread calling wait(). - pthread_cond_signal(&cond); - pthread_mutex_unlock(&mutex); - } - - void disconnected() - { - pthread_mutex_lock(&mutex); - state = DISCONNECTED; - pthread_mutex_unlock(&mutex); - } - - void received(queue<Event> events) - { - while (!events.empty()) { - Event event = events.front(); - events.pop(); - - switch (event.type()) { - case Event::SUBSCRIBED: { - cout << endl << "Received a SUBSCRIBED event" << endl; - - pthread_mutex_lock(&mutex); - state = SUBSCRIBED; - pthread_mutex_unlock(&mutex); - - framework.mutable_id()->CopyFrom(event.subscribed().framework_id()); - - cout << "Subscribed with ID '" << framework.id() << endl; - break; - } - - case Event::OFFERS: { - cout << endl << "Received an OFFERS event" << endl; - resourceOffers(google::protobuf::convert(event.offers().offers())); - break; - } - - case Event::RESCIND: { - cout << endl << "Received a RESCIND event" << endl; - break; - } - - case Event::UPDATE: { - cout << endl << "Received an UPDATE event" << endl; - - // TODO(zuyu): Do batch processing of UPDATE events. - statusUpdate(event.update().status()); - break; - } - - case Event::MESSAGE: { - cout << endl << "Received a MESSAGE event" << endl; - break; - } - - case Event::FAILURE: { - cout << endl << "Received a FAILURE event" << endl; - - if (event.failure().has_executor_id()) { - // Executor failed. - cout << "Executor '" - << event.failure().executor_id().value() << "' terminated"; - - if (event.failure().has_slave_id()) { - cout << " on Slave '" - << event.failure().slave_id().value() << "'"; - } - - if (event.failure().has_status()) { - cout << ", and " << WSTRINGIFY(event.failure().status()); - } - - cout << endl; - } else { - // Slave failed. - cout << "Slave '" << event.failure().slave_id().value() - << "' terminated" << endl; - } - break; - } - - case Event::ERROR: { - cout << endl << "Received an ERROR event: " - << event.error().message() << endl; - finalize(); - break; - } - - default: { - EXIT(1) << "Received an UNKNOWN event"; - } - } - } - } - - void wait() - { - pthread_mutex_lock(&mutex); - if (state == INITIALIZING) { - // wait for connected() to be called. - pthread_cond_wait(&cond, &mutex); - } - - // CONNECTED state. - pthread_mutex_unlock(&mutex); - - while (true) { - pthread_mutex_lock(&mutex); - if (state == CONNECTED || - state == DISCONNECTED) { - pthread_mutex_unlock(&mutex); - doRegistration(); - sleep(1); - } else if (state == DONE) { - pthread_mutex_unlock(&mutex); - break; - } else { - pthread_cond_wait(&cond, &mutex); - pthread_mutex_unlock(&mutex); - } - } - } - -private: - void resourceOffers(const vector<Offer>& offers) - { - foreach (const Offer& offer, offers) { - cout << "Received offer " << offer.id() << " with " << offer.resources() - << endl; - - static const Resources TASK_RESOURCES = Resources::parse( - "cpus:" + stringify(CPUS_PER_TASK) + - ";mem:" + stringify(MEM_PER_TASK)).get(); - - Resources remaining = offer.resources(); - - // Launch tasks. - vector<TaskInfo> tasks; - while (tasksLaunched < totalTasks && - remaining.flatten().contains(TASK_RESOURCES)) { - int taskId = tasksLaunched++; - - cout << "Launching task " << taskId << " using offer " - << offer.id() << endl; - - TaskInfo task; - task.set_name("Task " + lexical_cast<string>(taskId)); - task.mutable_task_id()->set_value(lexical_cast<string>(taskId)); - task.mutable_slave_id()->MergeFrom(offer.slave_id()); - task.mutable_executor()->MergeFrom(executor); - - Option<Resources> resources = - remaining.find(TASK_RESOURCES.flatten(framework.role())); - - CHECK_SOME(resources); - task.mutable_resources()->MergeFrom(resources.get()); - remaining -= resources.get(); - - tasks.push_back(task); - } - - Call call; - call.mutable_framework_info()->CopyFrom(framework); - call.set_type(Call::ACCEPT); - - Call::Accept* accept = call.mutable_accept(); - accept->add_offer_ids()->CopyFrom(offer.id()); - - Offer::Operation* operation = accept->add_operations(); - operation->set_type(Offer::Operation::LAUNCH); - foreach (const TaskInfo& taskInfo, tasks) { - operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo); - } - - mesos.send(call); - } - } - - void statusUpdate(const TaskStatus& status) - { - cout << "Task " << status.task_id() << " is in state " << status.state(); - - if (status.has_message()) { - cout << " with message '" << status.message() << "'"; - } - cout << endl; - - if (status.has_uuid()) { - Call call; - call.set_type(Call::ACKNOWLEDGE); - call.mutable_framework_info()->CopyFrom(framework); - - Call::Acknowledge* ack = call.mutable_acknowledge(); - ack->mutable_slave_id()->CopyFrom(status.slave_id()); - ack->mutable_task_id ()->CopyFrom(status.task_id ()); - ack->set_uuid(status.uuid()); - - mesos.send(call); - } - - if (status.state() == TASK_FINISHED) { - ++tasksFinished; - } - - if (status.state() == TASK_LOST || - status.state() == TASK_KILLED || - status.state() == TASK_FAILED) { - EXIT(1) << "Exiting because task " << status.task_id() - << " is in unexpected state " << status.state() - << " with reason " << status.reason() - << " from source " << status.source() - << " with message '" << status.message() << "'"; - } - - if (tasksFinished == totalTasks) { - finalize(); - } - } - - void doRegistration() - { - pthread_mutex_lock(&mutex); - if (state != CONNECTED && - state != DISCONNECTED) { - pthread_mutex_unlock(&mutex); - return; - } - - Call call; - call.mutable_framework_info()->CopyFrom(framework); - call.set_type(Call::SUBSCRIBE); - pthread_mutex_unlock(&mutex); - - mesos.send(call); - } - - void finalize() - { - Call call; - call.set_type(Call::TEARDOWN); - call.mutable_framework_info()->CopyFrom(framework); - - mesos.send(call); - - pthread_mutex_lock(&mutex); - state = DONE; - pthread_cond_signal(&cond); - pthread_mutex_unlock(&mutex); - } - - FrameworkInfo framework; - const ExecutorInfo executor; - scheduler::Mesos mesos; - - enum State { - INITIALIZING = 0, - CONNECTED = 1, - SUBSCRIBED = 2, - DISCONNECTED = 3, - DONE = 4 - } state; - - int tasksLaunched; - int tasksFinished; - const int totalTasks; - - pthread_mutex_t mutex; - pthread_cond_t cond; -}; - - -void usage(const char* argv0, const flags::FlagsBase& flags) -{ - cerr << "Usage: " << Path(argv0).basename() << " [...]" << endl - << endl - << "Supported options:" << endl - << flags.usage(); -} - - -int main(int argc, char** argv) -{ - // Find this executable's directory to locate executor. - string uri; - Option<string> value = os::getenv("MESOS_BUILD_DIR"); - if (value.isSome()) { - uri = path::join(value.get(), "src", "test-executor"); - } else { - uri = - path::join( - os::realpath(Path(argv[0]).dirname()).get(), - "src", - "test-executor"); - } - - mesos::internal::logging::Flags flags; - - string role; - flags.add(&role, - "role", - "Role to use when registering", - "*"); - - Option<string> master; - flags.add(&master, - "master", - "ip:port of master to connect"); - - Try<Nothing> load = flags.load(None(), argc, argv); - - if (load.isError()) { - cerr << load.error() << endl; - usage(argv[0], flags); - EXIT(1); - } else if (master.isNone()) { - cerr << "Missing --master" << endl; - usage(argv[0], flags); - EXIT(1); - } - - internal::logging::initialize(argv[0], flags, true); // Catch signals. - - FrameworkInfo framework; - framework.set_user(""); // Have Mesos fill in the current user. - framework.set_name("Low-Level Scheduler using pthread (C++)"); - framework.set_role(role); - - value = os::getenv("MESOS_CHECKPOINT"); - if (value.isSome()) { - framework.set_checkpoint( - numify<bool>(value.get()).get()); - } - - ExecutorInfo executor; - executor.mutable_executor_id()->set_value("default"); - executor.mutable_command()->set_value(uri); - executor.set_name("Test Executor (C++)"); - executor.set_source("cpp_test"); - - LowLevelScheduler* scheduler; - if (os::getenv("MESOS_AUTHENTICATE").isSome()) { - cout << "Enabling authentication for the scheduler" << endl; - - value = os::getenv("DEFAULT_PRINCIPAL"); - if (value.isNone()) { - EXIT(1) << "Expecting authentication principal in the environment"; - } - - Credential credential; - credential.set_principal(value.get()); - - framework.set_principal(value.get()); - - value = os::getenv("DEFAULT_SECRET"); - if (value.isNone()) { - EXIT(1) << "Expecting authentication secret in the environment"; - } - - credential.set_secret(value.get()); - - scheduler = - new LowLevelScheduler(framework, executor, master.get(), credential); - } else { - framework.set_principal("low-level-scheduler-pthread-cpp"); - - scheduler = - new LowLevelScheduler(framework, executor, master.get()); - } - - scheduler->wait(); - delete scheduler; - - return EXIT_SUCCESS; -} http://git-wip-us.apache.org/repos/asf/mesos/blob/6e168073/src/tests/event_call_framework_test.sh ---------------------------------------------------------------------- diff --git a/src/tests/event_call_framework_test.sh b/src/tests/event_call_framework_test.sh new file mode 100755 index 0000000..e42c4e8 --- /dev/null +++ b/src/tests/event_call_framework_test.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment. + +env | grep MESOS_SOURCE_DIR >/dev/null + +test $? != 0 && \ + echo "Failed to find MESOS_SOURCE_DIR in environment" && \ + exit 1 + +env | grep MESOS_BUILD_DIR >/dev/null + +test $? != 0 && \ + echo "Failed to find MESOS_BUILD_DIR in environment" && \ + exit 1 + +source ${MESOS_SOURCE_DIR}/support/atexit.sh + +MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX` + +atexit "rm -rf ${MESOS_WORK_DIR}" +export MESOS_WORK_DIR=${MESOS_WORK_DIR} + +# Set local Mesos runner to use 3 slaves +export MESOS_NUM_SLAVES=3 + +# Set resources for the slave. +export MESOS_RESOURCES="cpus:2;mem:10240" + +# Check that the C++ low level scheduler executes without crashing (returns 0). +exec ${MESOS_BUILD_DIR}/src/event-call-framework --master=local http://git-wip-us.apache.org/repos/asf/mesos/blob/6e168073/src/tests/examples_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/examples_tests.cpp b/src/tests/examples_tests.cpp index 2ff6e7a..3f56b30 100644 --- a/src/tests/examples_tests.cpp +++ b/src/tests/examples_tests.cpp @@ -26,10 +26,7 @@ TEST_SCRIPT(ExamplesTest, TestFramework, "test_framework_test.sh") TEST_SCRIPT(ExamplesTest, NoExecutorFramework, "no_executor_framework_test.sh") -TEST_SCRIPT(ExamplesTest, LowLevelSchedulerLibprocess, - "low_level_scheduler_libprocess_test.sh") -TEST_SCRIPT(ExamplesTest, LowLevelSchedulerPthread, - "low_level_scheduler_pthread_test.sh") +TEST_SCRIPT(ExamplesTest, EventCallFramework, "event_call_framework_test.sh") TEST_SCRIPT(ExamplesTest, PersistentVolumeFramework, http://git-wip-us.apache.org/repos/asf/mesos/blob/6e168073/src/tests/low_level_scheduler_libprocess_test.sh ---------------------------------------------------------------------- diff --git a/src/tests/low_level_scheduler_libprocess_test.sh b/src/tests/low_level_scheduler_libprocess_test.sh deleted file mode 100755 index 7984559..0000000 --- a/src/tests/low_level_scheduler_libprocess_test.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env bash - -# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment. - -env | grep MESOS_SOURCE_DIR >/dev/null - -test $? != 0 && \ - echo "Failed to find MESOS_SOURCE_DIR in environment" && \ - exit 1 - -env | grep MESOS_BUILD_DIR >/dev/null - -test $? != 0 && \ - echo "Failed to find MESOS_BUILD_DIR in environment" && \ - exit 1 - -source ${MESOS_SOURCE_DIR}/support/atexit.sh - -MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX` - -atexit "rm -rf ${MESOS_WORK_DIR}" -export MESOS_WORK_DIR=${MESOS_WORK_DIR} - -# Set local Mesos runner to use 3 slaves -export MESOS_NUM_SLAVES=3 - -# Set resources for the slave. -export MESOS_RESOURCES="cpus:2;mem:10240" - -# Check that the C++ low level scheduler executes without crashing (returns 0). -exec ${MESOS_BUILD_DIR}/src/low-level-scheduler-libprocess --master=local http://git-wip-us.apache.org/repos/asf/mesos/blob/6e168073/src/tests/low_level_scheduler_pthread_test.sh ---------------------------------------------------------------------- diff --git a/src/tests/low_level_scheduler_pthread_test.sh b/src/tests/low_level_scheduler_pthread_test.sh deleted file mode 100755 index c6e7115..0000000 --- a/src/tests/low_level_scheduler_pthread_test.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env bash - -# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment. - -env | grep MESOS_SOURCE_DIR >/dev/null - -test $? != 0 && \ - echo "Failed to find MESOS_SOURCE_DIR in environment" && \ - exit 1 - -env | grep MESOS_BUILD_DIR >/dev/null - -test $? != 0 && \ - echo "Failed to find MESOS_BUILD_DIR in environment" && \ - exit 1 - -source ${MESOS_SOURCE_DIR}/support/atexit.sh - -MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX` - -atexit "rm -rf ${MESOS_WORK_DIR}" -export MESOS_WORK_DIR=${MESOS_WORK_DIR} - -# Set local Mesos runner to use 3 slaves -export MESOS_NUM_SLAVES=3 - -# Set resources for the slave. -export MESOS_RESOURCES="cpus:2;mem:10240" - -# Check that the C++ low level scheduler executes without crashing (returns 0). -exec ${MESOS_BUILD_DIR}/src/low-level-scheduler-pthread --master=local
