Repository: mesos Updated Branches: refs/heads/master 743e9e739 -> 4b15b9608
Added an example framework for testing persistent volumes. Review: https://reviews.apache.org/r/32984 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4b15b960 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4b15b960 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4b15b960 Branch: refs/heads/master Commit: 4b15b96089996243f040cc8e6956fae032511417 Parents: 2037099 Author: Jie Yu <[email protected]> Authored: Wed Apr 8 11:30:51 2015 -0700 Committer: Jie Yu <[email protected]> Committed: Fri Apr 10 14:56:36 2015 -0700 ---------------------------------------------------------------------- src/Makefile.am | 6 + src/examples/persistent_volume_framework.cpp | 497 +++++++++++++++++++++ src/tests/examples_tests.cpp | 3 + src/tests/persistent_volume_framework_test.sh | 28 ++ 4 files changed, 534 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4b15b960/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index fa609da..d15a373 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1276,6 +1276,11 @@ load_generator_framework_SOURCES = examples/load_generator_framework.cpp load_generator_framework_CPPFLAGS = $(MESOS_CPPFLAGS) load_generator_framework_LDADD = libmesos.la $(LDADD) +check_PROGRAMS += persistent-volume-framework +persistent_volume_framework_SOURCES = examples/persistent_volume_framework.cpp +persistent_volume_framework_CPPFLAGS = $(MESOS_CPPFLAGS) +persistent_volume_framework_LDADD = libmesos.la $(LDADD) + if OS_LINUX check_PROGRAMS += setns-test-helper setns_test_helper_SOURCES = \ @@ -1480,6 +1485,7 @@ dist_check_SCRIPTS += \ tests/java_framework_test.sh \ tests/java_log_test.sh \ tests/no_executor_framework_test.sh \ + tests/persistent_volume_framework_test.sh \ tests/python_framework_test.sh \ tests/test_framework_test.sh http://git-wip-us.apache.org/repos/asf/mesos/blob/4b15b960/src/examples/persistent_volume_framework.cpp ---------------------------------------------------------------------- diff --git a/src/examples/persistent_volume_framework.cpp b/src/examples/persistent_volume_framework.cpp new file mode 100644 index 0000000..8a893fc --- /dev/null +++ b/src/examples/persistent_volume_framework.cpp @@ -0,0 +1,497 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdlib.h> + +#include <sstream> +#include <vector> + +#include <glog/logging.h> + +#include <mesos/mesos.hpp> +#include <mesos/resources.hpp> +#include <mesos/scheduler.hpp> +#include <mesos/type_utils.hpp> + +#include <stout/flags.hpp> +#include <stout/format.hpp> +#include <stout/json.hpp> +#include <stout/option.hpp> +#include <stout/os.hpp> +#include <stout/protobuf.hpp> +#include <stout/stringify.hpp> +#include <stout/uuid.hpp> + +#include "logging/flags.hpp" +#include "logging/logging.hpp" + +using namespace mesos; +using namespace mesos::internal; + +using std::cout; +using std::endl; +using std::ostringstream; +using std::string; +using std::vector; + + +// TODO(jieyu): Currently, persistent volume is only allowed for +// reserved resources. +static Resources SHARD_INITIAL_RESOURCES(const string& role) +{ + return Resources::parse("cpus:0.1;mem:32;disk:16", role).get(); +} + + +static Resource SHARD_PERSISTENT_VOLUME( + const string& role, + const string& persistenceId, + const string& containerPath) +{ + Volume volume; + volume.set_container_path(containerPath); + volume.set_mode(Volume::RW); + + Resource::DiskInfo info; + info.mutable_persistence()->set_id(persistenceId); + info.mutable_volume()->CopyFrom(volume); + + Resource resource = Resources::parse("disk", "8", role).get(); + resource.mutable_disk()->CopyFrom(info); + + return resource; +} + + +static Offer::Operation CREATE(const Resources& volumes) +{ + Offer::Operation operation; + operation.set_type(Offer::Operation::CREATE); + operation.mutable_create()->mutable_volumes()->CopyFrom(volumes); + return operation; +} + + +static Offer::Operation LAUNCH(const vector<TaskInfo>& tasks) +{ + Offer::Operation operation; + operation.set_type(Offer::Operation::LAUNCH); + + foreach (const TaskInfo& task, tasks) { + operation.mutable_launch()->add_task_infos()->CopyFrom(task); + } + + return operation; +} + + +// The framework launches a task on each registered slave using a +// persistent volume. It restarts the task once the previous one on +// the slave finishes. The framework terminates once the number of +// tasks launched on each slave reaches a limit. +class PersistentVolumeScheduler : public Scheduler +{ +public: + PersistentVolumeScheduler( + const FrameworkInfo& _frameworkInfo, + size_t numShards, + size_t tasksPerShard) + : frameworkInfo(_frameworkInfo) + { + for (size_t i = 0; i < numShards; i++) { + shards.push_back(Shard( + "shard-" + stringify(i), + frameworkInfo.role(), + tasksPerShard)); + } + } + + virtual void registered( + SchedulerDriver* driver, + const FrameworkID& frameworkId, + const MasterInfo& masterInfo) + { + LOG(INFO) << "Registered with master " << masterInfo + << " and got framework ID " << frameworkId; + + frameworkInfo.mutable_id()->CopyFrom(frameworkId); + } + + virtual void reregistered( + SchedulerDriver* driver, + const MasterInfo& masterInfo) + { + LOG(INFO) << "Reregistered with master " << masterInfo; + } + + virtual void disconnected( + SchedulerDriver* driver) + { + LOG(INFO) << "Disconnected!"; + } + + virtual void resourceOffers( + SchedulerDriver* driver, + const vector<Offer>& offers) + { + foreach (const Offer& offer, offers) { + LOG(INFO) << "Received offer " << offer.id() << " from slave " + << offer.slave_id() << " (" << offer.hostname() << ") " + << "with " << offer.resources(); + + Resources offered = offer.resources(); + + // The operation we will perform on the offer. + vector<Offer::Operation> operations; + + foreach (Shard& shard, shards) { + switch (shard.state) { + case Shard::INIT: + if (offered.contains(shard.resources)) { + Resource volume = SHARD_PERSISTENT_VOLUME( + frameworkInfo.role(), + UUID::random().toString(), + "volume"); + + Try<Resources> resources = shard.resources.apply(CREATE(volume)); + CHECK_SOME(resources); + + TaskInfo task; + task.set_name(shard.name); + task.mutable_task_id()->set_value(UUID::random().toString()); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + task.mutable_resources()->CopyFrom(resources.get()); + task.mutable_command()->set_value("touch volume/persisted"); + + // Update the shard. + shard.state = Shard::STAGING; + shard.taskId = task.task_id(); + shard.volume.id = volume.disk().persistence().id(); + shard.volume.slave = offer.slave_id().value(); + shard.resources = resources.get(); + shard.launched++; + + operations.push_back(CREATE(volume)); + operations.push_back(LAUNCH({task})); + + resources = offered.apply(vector<Offer::Operation>{ + CREATE(volume), + LAUNCH({task})}); + + CHECK_SOME(resources); + offered = resources.get(); + } + break; + case Shard::WAITING: + if (offered.contains(shard.resources)) { + CHECK_EQ(shard.volume.slave, offer.slave_id().value()); + + TaskInfo task; + task.set_name(shard.name); + task.mutable_task_id()->set_value(UUID::random().toString()); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + task.mutable_resources()->CopyFrom(shard.resources); + task.mutable_command()->set_value("test -f volume/persisted"); + + // Update the shard. + shard.state = Shard::STAGING; + shard.taskId = task.task_id(); + shard.launched++; + + operations.push_back(LAUNCH({task})); + } + break; + case Shard::STAGING: + case Shard::RUNNING: + case Shard::DONE: + // Ignore the offer. + break; + default: + LOG(ERROR) << "Unexpected shard state: " << shard.state; + driver->abort(); + break; + } + } + + driver->acceptOffers({offer.id()}, operations); + } + } + + virtual void offerRescinded( + SchedulerDriver* driver, + const OfferID& offerId) + { + LOG(INFO) << "Offer " << offerId << " has been rescinded"; + } + + virtual void statusUpdate( + SchedulerDriver* driver, + const TaskStatus& status) + { + LOG(INFO) << "Task '" << status.task_id() << "' is in state " + << status.state(); + + foreach (Shard& shard, shards) { + if (shard.taskId == status.task_id()) { + switch (status.state()) { + case TASK_RUNNING: + shard.state = Shard::RUNNING; + break; + case TASK_FINISHED: + if (shard.launched >= shard.tasks) { + shard.state = Shard::DONE; + } else { + shard.state = Shard::WAITING; + } + break; + case TASK_STAGING: + case TASK_STARTING: + // Ignore the status update. + break; + default: + LOG(ERROR) << "Unexpected task state " << status.state() + << " for task '" << status.task_id() << "'"; + driver->abort(); + break; + } + + break; + } + } + + // Check the terminal condition. + bool terminal = true; + foreach (const Shard& shard, shards) { + if (shard.state != Shard::DONE) { + terminal = false; + break; + } + } + + if (terminal) { + driver->stop(); + } + } + + virtual void frameworkMessage( + SchedulerDriver* driver, + const ExecutorID& executorId, + const SlaveID& slaveId, + const string& data) + { + LOG(INFO) << "Received framework message from executor '" << executorId + << "' on slave " << slaveId << ": '" << data << "'"; + } + + virtual void slaveLost( + SchedulerDriver* driver, + const SlaveID& slaveId) + { + LOG(INFO) << "Lost slave " << slaveId; + } + + virtual void executorLost( + SchedulerDriver* driver, + const ExecutorID& executorId, + const SlaveID& slaveId, + int status) + { + LOG(INFO) << "Lost executor '" << executorId << "' on slave " + << slaveId << ", status " << status; + } + + virtual void error( + SchedulerDriver* driver, + const string& message) + { + LOG(ERROR) << message; + } + +private: + struct Shard + { + enum State + { + INIT = 0, // The shard hasn't been launched yet. + STAGING, // The shard has been launched. + RUNNING, // The shard is running. + WAITING, // The shard is waiting to be re-launched. + DONE, // The shard has finished all tasks. + + // TODO(jieyu): Add another state so that we can track the + // destroy of the volume once all tasks finish. + }; + + // The persistent volume associated with this shard. + struct Volume + { + // The persistence ID. + string id; + + // An identifier used to uniquely identify a slave (even across + // reboot). In the test, we use the slave ID since slaves will not + // be rebooted. Note that we cannot use hostname as the identifier + // in a local cluster because all slaves share the same hostname. + string slave; + }; + + Shard(const string& _name, const string& role, size_t _tasks) + : name(_name), + state(INIT), + resources(SHARD_INITIAL_RESOURCES(role)), + launched(0), + tasks(_tasks) {} + + string name; + State state; // The current state of this shard. + TaskID taskId; // The ID of the current task. + Volume volume; // The persistent volume associated with the shard. + Resources resources; // Resources required to launch the shard. + size_t launched; // How many tasks this shard has launched. + size_t tasks; // How many tasks this shard should launch. + }; + + FrameworkInfo frameworkInfo; + vector<Shard> shards; +}; + + +class Flags : public logging::Flags +{ +public: + Flags() + { + add(&master, + "master", + "The master to connect to. May be one of:\n" + " master@addr:port (The PID of the master)\n" + " zk://host1:port1,host2:port2,.../path\n" + " zk://username:password@host1:port1,host2:port2,.../path\n" + " file://path/to/file (where file contains one of the above)"); + + add(&role, + "role", + "Role to use when registering", + "test"); + + add(&principal, + "principal", + "The principal used to identify this framework", + "test"); + + add(&num_shards, + "num_shards", + "The number of shards the framework will run.", + 3); + + add(&tasks_per_shard, + "tasks_per_shard", + "The number of tasks should be launched per shard.", + 3); + + add(&help, + "help", + "Print this help message", + false); + } + + Option<string> master; + string role; + string principal; + size_t num_shards; + size_t tasks_per_shard; + bool help; +}; + + +static string usage(const char* argv0, const flags::FlagsBase& flags) +{ + ostringstream stream; + + stream << "Usage: " << os::basename(argv0).get() << " [...]" << endl + << endl + << "Supported options:" << endl + << flags.usage(); + + return stream.str(); +} + + +int main(int argc, char** argv) +{ + Flags flags; + + Try<Nothing> load = flags.load("MESOS_", argc, argv); + + if (load.isError()) { + EXIT(1) << load.error() << endl << usage(argv[0], flags); + } + + if (flags.help) { + EXIT(1) << usage(argv[0], flags); + } + + if (flags.master.isNone()) { + EXIT(1) << "Missing required option --master. See --help"; + } + + logging::initialize(argv[0], flags, true); // Catch signals. + + FrameworkInfo framework; + framework.set_user(""); // Have Mesos fill in the current user. + framework.set_name("Persistent Volume Framework (C++)"); + framework.set_role(flags.role); + framework.set_checkpoint(true); + framework.set_principal(flags.principal); + + if (flags.master.get() == "local") { + // Configure master. + os::setenv("MESOS_ROLES", flags.role); + os::setenv("MESOS_AUTHENTICATE", "false"); + + ACLs acls; + ACL::RegisterFramework* acl = acls.add_register_frameworks(); + acl->mutable_principals()->set_type(ACL::Entity::ANY); + acl->mutable_roles()->add_values(flags.role); + + os::setenv("MESOS_ACLS", stringify(JSON::Protobuf(acls))); + + // Configure slave. + os::setenv("MESOS_DEFAULT_ROLE", flags.role); + + const string launcherDir = os::dirname(os::realpath(argv[0]).get()).get(); + os::setenv("MESOS_LAUNCHER_DIR", launcherDir); + os::libraries::appendPaths(launcherDir); + } + + PersistentVolumeScheduler scheduler( + framework, + flags.num_shards, + flags.tasks_per_shard); + + MesosSchedulerDriver* driver = new MesosSchedulerDriver( + &scheduler, + framework, + flags.master.get()); + + int status = driver->run() == DRIVER_STOPPED ? 0 : 1; + + driver->stop(); + delete driver; + return status; +} http://git-wip-us.apache.org/repos/asf/mesos/blob/4b15b960/src/tests/examples_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/examples_tests.cpp b/src/tests/examples_tests.cpp index 5222b6d..f85b815 100644 --- a/src/tests/examples_tests.cpp +++ b/src/tests/examples_tests.cpp @@ -32,6 +32,9 @@ TEST_SCRIPT(ExamplesTest, LowLevelSchedulerLibprocess, TEST_SCRIPT(ExamplesTest, LowLevelSchedulerPthread, "low_level_scheduler_pthread_test.sh") +TEST_SCRIPT(ExamplesTest, PersistentVolumeFramework, + "persistent_volume_framework_test.sh"); + #ifdef MESOS_HAS_JAVA TEST_SCRIPT(ExamplesTest, JavaFramework, "java_framework_test.sh") TEST_SCRIPT(ExamplesTest, JavaException, "java_exception_test.sh") http://git-wip-us.apache.org/repos/asf/mesos/blob/4b15b960/src/tests/persistent_volume_framework_test.sh ---------------------------------------------------------------------- diff --git a/src/tests/persistent_volume_framework_test.sh b/src/tests/persistent_volume_framework_test.sh new file mode 100755 index 0000000..c96fb70 --- /dev/null +++ b/src/tests/persistent_volume_framework_test.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash + +# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment. + +env | grep MESOS_SOURCE_DIR >/dev/null + +test $? != 0 && \ + echo "Failed to find MESOS_SOURCE_DIR in environment" && \ + exit 1 + +env | grep MESOS_BUILD_DIR >/dev/null + +test $? != 0 && \ + echo "Failed to find MESOS_BUILD_DIR in environment" && \ + exit 1 + +source ${MESOS_SOURCE_DIR}/support/atexit.sh + +MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX` + +atexit "rm -rf ${MESOS_WORK_DIR}" +export MESOS_WORK_DIR=${MESOS_WORK_DIR} + +# Set local Mesos runner to use 3 slaves +export MESOS_NUM_SLAVES=3 + +# Check that the framework executes without crashing (returns 0). +exec ${MESOS_BUILD_DIR}/src/persistent-volume-framework --master=local
