Update Mesos executor to use synchronized. Review: https://reviews.apache.org/r/35097
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8939609d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8939609d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8939609d Branch: refs/heads/master Commit: 8939609d403aa1043d637cc03647c4ee40478b20 Parents: b2d8047 Author: Joris Van Remoortere <[email protected]> Authored: Sat Jun 13 07:12:31 2015 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Sun Jun 14 02:43:01 2015 -0700 ---------------------------------------------------------------------- src/exec/exec.cpp | 287 +++++++++++++++++++++++++------------------------ 1 file changed, 145 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8939609d/src/exec/exec.cpp ---------------------------------------------------------------------- diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp index 0dfd5a6..930dda9 100644 --- a/src/exec/exec.cpp +++ b/src/exec/exec.cpp @@ -43,9 +43,9 @@ #include <stout/os.hpp> #include <stout/stopwatch.hpp> #include <stout/stringify.hpp> +#include <stout/synchronized.hpp> #include <stout/uuid.hpp> -#include "common/lock.hpp" #include "common/protobuf_utils.hpp" #include "logging/flags.hpp" @@ -404,8 +404,9 @@ protected: { terminate(self()); - Lock lock(mutex); - pthread_cond_signal(cond); + synchronized (mutex) { + pthread_cond_signal(cond); + } } void abort() @@ -413,8 +414,9 @@ protected: LOG(INFO) << "Deactivating the executor libprocess"; CHECK(aborted); - Lock lock(mutex); - pthread_cond_signal(cond); + synchronized (mutex) { + pthread_cond_signal(cond); + } } void _recoveryTimeout(UUID _connection) @@ -611,173 +613,174 @@ MesosExecutorDriver::~MesosExecutorDriver() Status MesosExecutorDriver::start() { - Lock lock(&mutex); + synchronized (mutex) { + if (status != DRIVER_NOT_STARTED) { + return status; + } - if (status != DRIVER_NOT_STARTED) { - return status; - } + // Set stream buffering mode to flush on newlines so that we + // capture logs from user processes even when output is redirected + // to a file. + setvbuf(stdout, 0, _IOLBF, 0); + setvbuf(stderr, 0, _IOLBF, 0); - // Set stream buffering mode to flush on newlines so that we capture logs - // from user processes even when output is redirected to a file. - setvbuf(stdout, 0, _IOLBF, 0); - setvbuf(stderr, 0, _IOLBF, 0); + bool local; - bool local; + UPID slave; + SlaveID slaveId; + FrameworkID frameworkId; + ExecutorID executorId; + string workDirectory; + bool checkpoint; - UPID slave; - SlaveID slaveId; - FrameworkID frameworkId; - ExecutorID executorId; - string workDirectory; - bool checkpoint; - - Option<string> value; - std::istringstream iss; + Option<string> value; + std::istringstream iss; - // Check if this is local (for example, for testing). - local = os::getenv("MESOS_LOCAL").isSome(); + // Check if this is local (for example, for testing). + local = os::getenv("MESOS_LOCAL").isSome(); - // Get slave PID from environment. - value = os::getenv("MESOS_SLAVE_PID"); - if (value.isNone()) { - EXIT(1) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment."; - } + // Get slave PID from environment. + value = os::getenv("MESOS_SLAVE_PID"); + if (value.isNone()) { + EXIT(1) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment."; + } - slave = UPID(value.get()); - CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'"; + slave = UPID(value.get()); + CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'"; - // Get slave ID from environment. - value = os::getenv("MESOS_SLAVE_ID"); - if (value.isNone()) { - EXIT(1) << "Expecting 'MESOS_SLAVE_ID' to be set in the environment."; - } - slaveId.set_value(value.get()); + // Get slave ID from environment. + value = os::getenv("MESOS_SLAVE_ID"); + if (value.isNone()) { + EXIT(1) << "Expecting 'MESOS_SLAVE_ID' to be set in the environment."; + } + slaveId.set_value(value.get()); - // Get framework ID from environment. - value = os::getenv("MESOS_FRAMEWORK_ID"); - if (value.isNone()) { - EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment."; - } - frameworkId.set_value(value.get()); + // Get framework ID from environment. + value = os::getenv("MESOS_FRAMEWORK_ID"); + if (value.isNone()) { + EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment."; + } + frameworkId.set_value(value.get()); - // Get executor ID from environment. - value = os::getenv("MESOS_EXECUTOR_ID"); - if (value.isNone()) { - EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment."; - } - executorId.set_value(value.get()); + // Get executor ID from environment. + value = os::getenv("MESOS_EXECUTOR_ID"); + if (value.isNone()) { + EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment."; + } + executorId.set_value(value.get()); - // Get working directory from environment. - value = os::getenv("MESOS_DIRECTORY"); - if (value.isNone()) { - EXIT(1) << "Expecting 'MESOS_DIRECTORY' to be set in the environment."; - } - workDirectory = value.get(); + // Get working directory from environment. + value = os::getenv("MESOS_DIRECTORY"); + if (value.isNone()) { + EXIT(1) << "Expecting 'MESOS_DIRECTORY' to be set in the environment."; + } + workDirectory = value.get(); - // Get checkpointing status from environment. - value = os::getenv("MESOS_CHECKPOINT"); - checkpoint = value.isSome() && value.get() == "1"; + // Get checkpointing status from environment. + value = os::getenv("MESOS_CHECKPOINT"); + checkpoint = value.isSome() && value.get() == "1"; - Duration recoveryTimeout = slave::RECOVERY_TIMEOUT; + Duration recoveryTimeout = slave::RECOVERY_TIMEOUT; - // Get the recovery timeout if checkpointing is enabled. - if (checkpoint) { - value = os::getenv("MESOS_RECOVERY_TIMEOUT"); + // Get the recovery timeout if checkpointing is enabled. + if (checkpoint) { + value = os::getenv("MESOS_RECOVERY_TIMEOUT"); - if (value.isSome()) { - Try<Duration> _recoveryTimeout = Duration::parse(value.get()); + if (value.isSome()) { + Try<Duration> _recoveryTimeout = Duration::parse(value.get()); - CHECK_SOME(_recoveryTimeout) - << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': " - << _recoveryTimeout.error(); + CHECK_SOME(_recoveryTimeout) + << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': " + << _recoveryTimeout.error(); - recoveryTimeout = _recoveryTimeout.get(); + recoveryTimeout = _recoveryTimeout.get(); + } } - } - CHECK(process == NULL); - - process = new ExecutorProcess( - slave, - this, - executor, - slaveId, - frameworkId, - executorId, - local, - workDirectory, - checkpoint, - recoveryTimeout, - &mutex, - &cond); - - spawn(process); - - return status = DRIVER_RUNNING; + CHECK(process == NULL); + + process = new ExecutorProcess( + slave, + this, + executor, + slaveId, + frameworkId, + executorId, + local, + workDirectory, + checkpoint, + recoveryTimeout, + &mutex, + &cond); + + spawn(process); + + return status = DRIVER_RUNNING; + } } Status MesosExecutorDriver::stop() { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) { + return status; + } - CHECK(process != NULL); + CHECK(process != NULL); - dispatch(process, &ExecutorProcess::stop); + dispatch(process, &ExecutorProcess::stop); - bool aborted = status == DRIVER_ABORTED; + bool aborted = status == DRIVER_ABORTED; - status = DRIVER_STOPPED; + status = DRIVER_STOPPED; - return aborted ? DRIVER_ABORTED : status; + return aborted ? DRIVER_ABORTED : status; + } } Status MesosExecutorDriver::abort() { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - CHECK(process != NULL); + CHECK(process != NULL); - // We set the volatile aborted to true here to prevent any further - // messages from being processed in the ExecutorProcess. However, - // if abort() is called from another thread as the ExecutorProcess, - // there may be at most one additional message processed. - // TODO(bmahler): Use an atomic boolean. - process->aborted = true; + // We set the volatile aborted to true here to prevent any further + // messages from being processed in the ExecutorProcess. However, + // if abort() is called from another thread as the ExecutorProcess, + // there may be at most one additional message processed. + // TODO(bmahler): Use an atomic boolean. + process->aborted = true; - // Dispatching here ensures that we still process the outstanding - // requests *from* the executor, since those do proceed when - // aborted is true. - dispatch(process, &ExecutorProcess::abort); + // Dispatching here ensures that we still process the outstanding + // requests *from* the executor, since those do proceed when + // aborted is true. + dispatch(process, &ExecutorProcess::abort); - return status = DRIVER_ABORTED; + return status = DRIVER_ABORTED; + } } Status MesosExecutorDriver::join() { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - while (status == DRIVER_RUNNING) { - pthread_cond_wait(&cond, &mutex); - } + while (status == DRIVER_RUNNING) { + pthread_cond_wait(&cond, &mutex); + } - CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED); + CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED); - return status; + return status; + } } @@ -790,31 +793,31 @@ Status MesosExecutorDriver::run() Status MesosExecutorDriver::sendStatusUpdate(const TaskStatus& taskStatus) { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - CHECK(process != NULL); + CHECK(process != NULL); - dispatch(process, &ExecutorProcess::sendStatusUpdate, taskStatus); + dispatch(process, &ExecutorProcess::sendStatusUpdate, taskStatus); - return status; + return status; + } } Status MesosExecutorDriver::sendFrameworkMessage(const string& data) { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - CHECK(process != NULL); + CHECK(process != NULL); - dispatch(process, &ExecutorProcess::sendFrameworkMessage, data); + dispatch(process, &ExecutorProcess::sendFrameworkMessage, data); - return status; + return status; + } }
