http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/module.cpp ---------------------------------------------------------------------- diff --git a/src/python/native/module.cpp b/src/python/native/module.cpp deleted file mode 100644 index f523c1f..0000000 --- a/src/python/native/module.cpp +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file defines the _mesos.so binary module used by the Mesos Python API. - * This module contains private implementations of MesosSchedulerDriver and - * MesosExecutorDriver as Python types that get called from the public module - * called mesos (in <root>/src/python/src/mesos.py). This design was chosen - * so that most of the API (e.g. the Scheduler and Executor interfaces) can - * be written in Python, and only the parts that need to call into C++ are - * in C++. Note that the mesos module also contains public classes called - * MesosSchedulerDriver and MesosExecutorDriver. These call into the private - * _mesos.MesosSchedulerDriverImpl and _mesos.MesosExecutorDriverImpl. - */ - -// Python.h must be included before standard headers. -// See: http://docs.python.org/2/c-api/intro.html#include-files -#include <Python.h> - -#include <iostream> - -#include <mesos/executor.hpp> -#include <mesos/scheduler.hpp> - -#include "module.hpp" -#include "proxy_scheduler.hpp" -#include "mesos_scheduler_driver_impl.hpp" -#include "proxy_executor.hpp" -#include "mesos_executor_driver_impl.hpp" - -using namespace mesos; -using namespace mesos::python; - -using std::string; -using std::vector; -using std::map; - - -/** - * The Python module object for mesos_pb2 (which contains the protobuf - * classes generated for Python). - */ -PyObject* mesos::python::mesos_pb2 = NULL; - - -namespace { - -/** - * Method list for our Python module. - */ -PyMethodDef MODULE_METHODS[] = { - {NULL, NULL, 0, NULL} /* Sentinel */ -}; - -} // namespace { - - -/** - * Entry point called by Python to initialize our module. - */ -PyMODINIT_FUNC init_mesos(void) -{ - // Ensure that the interpreter's threading support is enabled - PyEval_InitThreads(); - - // Import the mesos_pb2 module (on which we depend for protobuf classes) - mesos_pb2 = PyImport_ImportModule("mesos_pb2"); - if (mesos_pb2 == NULL) - return; - - // Initialize our Python types - if (PyType_Ready(&MesosSchedulerDriverImplType) < 0) - return; - if (PyType_Ready(&MesosExecutorDriverImplType) < 0) - return; - - // Create the _mesos module and add our types to it - PyObject* module = Py_InitModule("_mesos", MODULE_METHODS); - Py_INCREF(&MesosSchedulerDriverImplType); - PyModule_AddObject(module, - "MesosSchedulerDriverImpl", - (PyObject*) &MesosSchedulerDriverImplType); - Py_INCREF(&MesosExecutorDriverImplType); - PyModule_AddObject(module, - "MesosExecutorDriverImpl", - (PyObject*) &MesosExecutorDriverImplType); -}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/module.hpp ---------------------------------------------------------------------- diff --git a/src/python/native/module.hpp b/src/python/native/module.hpp deleted file mode 100644 index 1c35e2e..0000000 --- a/src/python/native/module.hpp +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef MODULE_HPP -#define MODULE_HPP - -// Python.h must be included before standard headers. -// See: http://docs.python.org/2/c-api/intro.html#include-files -#include <Python.h> - -#include <iostream> - -#include <google/protobuf/io/zero_copy_stream_impl.h> - - -namespace mesos { namespace python { - -/** - * The Python module object for mesos_pb2 (which contains the protobuf - * classes generated for Python). - */ -extern PyObject* mesos_pb2; - - -/** - * RAII utility class for acquiring the Python global interpreter lock. - */ -class InterpreterLock { - PyGILState_STATE state; - -public: - InterpreterLock() { - state = PyGILState_Ensure(); - } - - ~InterpreterLock() { - PyGILState_Release(state); - } -}; - - -/** - * Convert a Python protocol buffer object into a C++ one by serializing - * it to a string and deserializing the result back in C++. Returns true - * on success, or prints an error and returns false on failure. - */ -template <typename T> -bool readPythonProtobuf(PyObject* obj, T* t) -{ - if (obj == Py_None) { - std::cerr << "None object given where protobuf expected" << std::endl; - return false; - } - PyObject* res = PyObject_CallMethod(obj, - (char*) "SerializeToString", - (char*) NULL); - if (res == NULL) { - std::cerr << "Failed to call Python object's SerializeToString " - << "(perhaps it is not a protobuf?)" << std::endl; - PyErr_Print(); - return false; - } - char* chars; - Py_ssize_t len; - if (PyString_AsStringAndSize(res, &chars, &len) < 0) { - std::cerr << "SerializeToString did not return a string" << std::endl; - PyErr_Print(); - Py_DECREF(res); - return false; - } - google::protobuf::io::ArrayInputStream stream(chars, len); - bool success = t->ParseFromZeroCopyStream(&stream); - if (!success) { - std::cerr << "Could not deserialize protobuf as expected type" << std::endl; - } - Py_DECREF(res); - return success; -} - - -/** - * Convert a C++ protocol buffer object into a Python one by serializing - * it to a string and deserializing the result back in Python. Returns the - * resulting PyObject* on success or raises a Python exception and returns - * NULL on failure. - */ -template <typename T> -PyObject* createPythonProtobuf(const T& t, const char* typeName) -{ - PyObject* dict = PyModule_GetDict(mesos_pb2); - if (dict == NULL) { - PyErr_Format(PyExc_Exception, "PyModule_GetDict failed"); - return NULL; - } - - PyObject* type = PyDict_GetItemString(dict, typeName); - if (type == NULL) { - PyErr_Format(PyExc_Exception, "Could not resolve mesos_pb2.%s", typeName); - return NULL; - } - if (!PyType_Check(type)) { - PyErr_Format(PyExc_Exception, "mesos_pb2.%s is not a type", typeName); - return NULL; - } - - std::string str; - if (!t.SerializeToString(&str)) { - PyErr_Format(PyExc_Exception, "C++ %s SerializeToString failed", typeName); - return NULL; - } - - // Propagates any exception that might happen in FromString - return PyObject_CallMethod(type, - (char*) "FromString", - (char*) "s#", - str.data(), - str.size()); -} - -} // namespace python { -} // namespace mesos { - -#endif /* MODULE_HPP */ http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/proxy_executor.cpp ---------------------------------------------------------------------- diff --git a/src/python/native/proxy_executor.cpp b/src/python/native/proxy_executor.cpp deleted file mode 100644 index 5e8637e..0000000 --- a/src/python/native/proxy_executor.cpp +++ /dev/null @@ -1,275 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Python.h must be included before standard headers. -// See: http://docs.python.org/2/c-api/intro.html#include-files -#include <Python.h> - -#include <iostream> - -#include "proxy_executor.hpp" -#include "module.hpp" -#include "mesos_executor_driver_impl.hpp" - -using namespace mesos; - -using std::cerr; -using std::endl; -using std::string; -using std::vector; -using std::map; - -namespace mesos { -namespace python { - -void ProxyExecutor::registered(ExecutorDriver* driver, - const ExecutorInfo& executorInfo, - const FrameworkInfo& frameworkInfo, - const SlaveInfo& slaveInfo) -{ - InterpreterLock lock; - - PyObject* executorInfoObj = NULL; - PyObject* frameworkInfoObj = NULL; - PyObject* slaveInfoObj = NULL; - PyObject* res = NULL; - - executorInfoObj = createPythonProtobuf(executorInfo, "ExecutorInfo"); - frameworkInfoObj = createPythonProtobuf(frameworkInfo, "FrameworkInfo"); - slaveInfoObj = createPythonProtobuf(slaveInfo, "SlaveInfo"); - - if (executorInfoObj == NULL || - frameworkInfoObj == NULL || - slaveInfoObj == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - res = PyObject_CallMethod(impl->pythonExecutor, - (char*) "registered", - (char*) "OOOO", - impl, - executorInfoObj, - frameworkInfoObj, - slaveInfoObj); - if (res == NULL) { - cerr << "Failed to call executor registered" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(executorInfoObj); - Py_XDECREF(frameworkInfoObj); - Py_XDECREF(slaveInfoObj); - Py_XDECREF(res); -} - - -void ProxyExecutor::reregistered(ExecutorDriver* driver, - const SlaveInfo& slaveInfo) -{ - InterpreterLock lock; - - PyObject* slaveInfoObj = NULL; - PyObject* res = NULL; - - slaveInfoObj = createPythonProtobuf(slaveInfo, "SlaveInfo"); - - if (slaveInfoObj == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - res = PyObject_CallMethod(impl->pythonExecutor, - (char*) "reregistered", - (char*) "OO", - impl, - slaveInfoObj); - if (res == NULL) { - cerr << "Failed to call executor re-registered" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(slaveInfoObj); - Py_XDECREF(res); -} - - -void ProxyExecutor::disconnected(ExecutorDriver* driver) -{ - InterpreterLock lock; - PyObject* res = PyObject_CallMethod(impl->pythonExecutor, - (char*) "disconnected", - (char*) "O", - impl); - if (res == NULL) { - cerr << "Failed to call executor's disconnected" << endl; - goto cleanup; - } -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(res); -} - - -void ProxyExecutor::launchTask(ExecutorDriver* driver, - const TaskInfo& task) -{ - InterpreterLock lock; - - PyObject* taskObj = NULL; - PyObject* res = NULL; - - taskObj = createPythonProtobuf(task, "TaskInfo"); - if (taskObj == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - res = PyObject_CallMethod(impl->pythonExecutor, - (char*) "launchTask", - (char*) "OO", - impl, - taskObj); - if (res == NULL) { - cerr << "Failed to call executor's launchTask" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(taskObj); - Py_XDECREF(res); -} - - -void ProxyExecutor::killTask(ExecutorDriver* driver, - const TaskID& taskId) -{ - InterpreterLock lock; - - PyObject* taskIdObj = NULL; - PyObject* res = NULL; - - taskIdObj = createPythonProtobuf(taskId, "TaskID"); - if (taskIdObj == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - res = PyObject_CallMethod(impl->pythonExecutor, - (char*) "killTask", - (char*) "OO", - impl, - taskIdObj); - if (res == NULL) { - cerr << "Failed to call executor's killTask" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(taskIdObj); - Py_XDECREF(res); -} - - -void ProxyExecutor::frameworkMessage(ExecutorDriver* driver, - const string& data) -{ - InterpreterLock lock; - - PyObject* res = NULL; - - res = PyObject_CallMethod(impl->pythonExecutor, - (char*) "frameworkMessage", - (char*) "Os#", - impl, - data.data(), - data.length()); - if (res == NULL) { - cerr << "Failed to call executor's frameworkMessage" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(res); -} - - -void ProxyExecutor::shutdown(ExecutorDriver* driver) -{ - InterpreterLock lock; - PyObject* res = PyObject_CallMethod(impl->pythonExecutor, - (char*) "shutdown", - (char*) "O", - impl); - if (res == NULL) { - cerr << "Failed to call executor's shutdown" << endl; - goto cleanup; - } -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(res); -} - - -void ProxyExecutor::error(ExecutorDriver* driver, const string& message) -{ - InterpreterLock lock; - PyObject* res = PyObject_CallMethod(impl->pythonExecutor, - (char*) "error", - (char*) "Os#", - impl, - message.data(), - message.length()); - if (res == NULL) { - cerr << "Failed to call executor's error" << endl; - goto cleanup; - } -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - // No need for driver.stop(); it should stop itself - } - Py_XDECREF(res); -} - -} // namespace python { -} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/proxy_executor.hpp ---------------------------------------------------------------------- diff --git a/src/python/native/proxy_executor.hpp b/src/python/native/proxy_executor.hpp deleted file mode 100644 index eeefc5e..0000000 --- a/src/python/native/proxy_executor.hpp +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef PROXY_EXECUTOR_HPP -#define PROXY_EXECUTOR_HPP - -// Python.h must be included before standard headers. -// See: http://docs.python.org/2/c-api/intro.html#include-files -#include <Python.h> - -#include <string> -#include <vector> - -#include <mesos/executor.hpp> - -namespace mesos { -namespace python { - -struct MesosExecutorDriverImpl; - -/** - * Proxy Executor implementation that will call into Python. - */ -class ProxyExecutor : public Executor -{ -public: - explicit ProxyExecutor(MesosExecutorDriverImpl *_impl) : impl(_impl) {} - - virtual ~ProxyExecutor() {} - - virtual void registered(ExecutorDriver* driver, - const ExecutorInfo& executorInfo, - const FrameworkInfo& frameworkInfo, - const SlaveInfo& slaveInfo); - virtual void reregistered(ExecutorDriver* driver, const SlaveInfo& slaveInfo); - virtual void disconnected(ExecutorDriver* driver); - virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task); - virtual void killTask(ExecutorDriver* driver, const TaskID& taskId); - virtual void frameworkMessage(ExecutorDriver* driver, - const std::string& data); - virtual void shutdown(ExecutorDriver* driver); - virtual void error(ExecutorDriver* driver, const std::string& message); - -private: - MesosExecutorDriverImpl *impl; -}; - -} // namespace python { -} // namespace mesos { - -#endif // PROXY_EXECUTOR_HPP http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/proxy_scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/python/native/proxy_scheduler.cpp b/src/python/native/proxy_scheduler.cpp deleted file mode 100644 index 95b09cf..0000000 --- a/src/python/native/proxy_scheduler.cpp +++ /dev/null @@ -1,386 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Python.h must be included before standard headers. -// See: http://docs.python.org/2/c-api/intro.html#include-files -#include <Python.h> - -#include <iostream> - -#include "proxy_scheduler.hpp" -#include "module.hpp" -#include "mesos_scheduler_driver_impl.hpp" - -using namespace mesos; - -using std::cerr; -using std::endl; -using std::string; -using std::vector; -using std::map; - -namespace mesos { -namespace python { - -void ProxyScheduler::registered(SchedulerDriver* driver, - const FrameworkID& frameworkId, - const MasterInfo& masterInfo) -{ - InterpreterLock lock; - - PyObject* fid = NULL; - PyObject* minfo = NULL; - PyObject* res = NULL; - - fid = createPythonProtobuf(frameworkId, "FrameworkID"); - if (fid == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - minfo = createPythonProtobuf(masterInfo, "MasterInfo"); - if (minfo == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - res = PyObject_CallMethod(impl->pythonScheduler, - (char*) "registered", - (char*) "OOO", - impl, - fid, - minfo); - if (res == NULL) { - cerr << "Failed to call scheduler's registered" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(fid); - Py_XDECREF(minfo); - Py_XDECREF(res); -} - - -void ProxyScheduler::reregistered(SchedulerDriver* driver, - const MasterInfo& masterInfo) -{ - InterpreterLock lock; - - PyObject* minfo = NULL; - PyObject* res = NULL; - - minfo = createPythonProtobuf(masterInfo, "MasterInfo"); - if (minfo == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - res = PyObject_CallMethod(impl->pythonScheduler, - (char*) "reregistered", - (char*) "OO", - impl, - minfo); - if (res == NULL) { - cerr << "Failed to call scheduler's reregistered" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(minfo); - Py_XDECREF(res); -} - - -void ProxyScheduler::disconnected(SchedulerDriver* driver) -{ - InterpreterLock lock; - - PyObject* res = NULL; - - res = PyObject_CallMethod(impl->pythonScheduler, - (char*) "disconnected", - (char*) "O", - impl); - if (res == NULL) { - cerr << "Failed to call scheduler's disconnected" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(res); -} - - -void ProxyScheduler::resourceOffers(SchedulerDriver* driver, - const vector<Offer>& offers) -{ - InterpreterLock lock; - - PyObject* list = NULL; - PyObject* res = NULL; - - list = PyList_New(offers.size()); - if (list == NULL) { - goto cleanup; - } - for (size_t i = 0; i < offers.size(); i++) { - PyObject* offer = createPythonProtobuf(offers[i], "Offer"); - if (offer == NULL) { - goto cleanup; - } - PyList_SetItem(list, i, offer); // Steals the reference to offer - } - - res = PyObject_CallMethod(impl->pythonScheduler, - (char*) "resourceOffers", - (char*) "OO", - impl, - list); - - if (res == NULL) { - cerr << "Failed to call scheduler's resourceOffer" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(list); - Py_XDECREF(res); -} - - -void ProxyScheduler::offerRescinded(SchedulerDriver* driver, - const OfferID& offerId) -{ - InterpreterLock lock; - - PyObject* oid = NULL; - PyObject* res = NULL; - - oid = createPythonProtobuf(offerId, "OfferID"); - if (oid == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - res = PyObject_CallMethod(impl->pythonScheduler, - (char*) "offerRescinded", - (char*) "OO", - impl, - oid); - if (res == NULL) { - cerr << "Failed to call scheduler's offerRescinded" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(oid); - Py_XDECREF(res); -} - - -void ProxyScheduler::statusUpdate(SchedulerDriver* driver, - const TaskStatus& status) -{ - InterpreterLock lock; - - PyObject* stat = NULL; - PyObject* res = NULL; - - stat = createPythonProtobuf(status, "TaskStatus"); - if (stat == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - res = PyObject_CallMethod(impl->pythonScheduler, - (char*) "statusUpdate", - (char*) "OO", - impl, - stat); - if (res == NULL) { - cerr << "Failed to call scheduler's statusUpdate" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(stat); - Py_XDECREF(res); -} - - -void ProxyScheduler::frameworkMessage(SchedulerDriver* driver, - const ExecutorID& executorId, - const SlaveID& slaveId, - const string& data) -{ - InterpreterLock lock; - - PyObject* eid = NULL; - PyObject* sid = NULL; - PyObject* res = NULL; - - eid = createPythonProtobuf(executorId, "ExecutorID"); - if (eid == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - sid = createPythonProtobuf(slaveId, "SlaveID"); - if (sid == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - res = PyObject_CallMethod(impl->pythonScheduler, - (char*) "frameworkMessage", - (char*) "OOOs#", - impl, - eid, - sid, - data.data(), - data.length()); - if (res == NULL) { - cerr << "Failed to call scheduler's frameworkMessage" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(eid); - Py_XDECREF(sid); - Py_XDECREF(res); -} - - -void ProxyScheduler::slaveLost(SchedulerDriver* driver, const SlaveID& slaveId) -{ - InterpreterLock lock; - - PyObject* sid = NULL; - PyObject* res = NULL; - - sid = createPythonProtobuf(slaveId, "SlaveID"); - if (sid == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - res = PyObject_CallMethod(impl->pythonScheduler, - (char*) "slaveLost", - (char*) "OO", - impl, - sid); - if (res == NULL) { - cerr << "Failed to call scheduler's slaveLost" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(sid); - Py_XDECREF(res); -} - - -void ProxyScheduler::executorLost(SchedulerDriver* driver, - const ExecutorID& executorId, - const SlaveID& slaveId, - int status) -{ - InterpreterLock lock; - - PyObject* executorIdObj = NULL; - PyObject* slaveIdObj = NULL; - PyObject* res = NULL; - - executorIdObj = createPythonProtobuf(executorId, "ExecutorID"); - slaveIdObj = createPythonProtobuf(slaveId, "SlaveID"); - - if (executorIdObj == NULL || slaveIdObj == NULL) { - goto cleanup; // createPythonProtobuf will have set an exception - } - - res = PyObject_CallMethod(impl->pythonScheduler, - (char*) "executorLost", - (char*) "OOOi", - impl, - executorIdObj, - slaveIdObj, - status); - if (res == NULL) { - cerr << "Failed to call scheduler's executorLost" << endl; - goto cleanup; - } - -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - driver->abort(); - } - Py_XDECREF(executorIdObj); - Py_XDECREF(slaveIdObj); - Py_XDECREF(res); -} - - -void ProxyScheduler::error(SchedulerDriver* driver, const string& message) -{ - InterpreterLock lock; - PyObject* res = PyObject_CallMethod(impl->pythonScheduler, - (char*) "error", - (char*) "Os#", - impl, - message.data(), - message.length()); - if (res == NULL) { - cerr << "Failed to call scheduler's error" << endl; - goto cleanup; - } -cleanup: - if (PyErr_Occurred()) { - PyErr_Print(); - // No need for driver.stop(); it should stop itself - } - Py_XDECREF(res); -} - -} // namespace python { -} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/proxy_scheduler.hpp ---------------------------------------------------------------------- diff --git a/src/python/native/proxy_scheduler.hpp b/src/python/native/proxy_scheduler.hpp deleted file mode 100644 index 501c574..0000000 --- a/src/python/native/proxy_scheduler.hpp +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef PROXY_SCHEDULER_HPP -#define PROXY_SCHEDULER_HPP - -// Python.h must be included before standard headers. -// See: http://docs.python.org/2/c-api/intro.html#include-files -#include <Python.h> - -#include <string> -#include <vector> - -#include <mesos/scheduler.hpp> - -namespace mesos { -namespace python { - -struct MesosSchedulerDriverImpl; - -/** - * Proxy Scheduler implementation that will call into Python. - */ -class ProxyScheduler : public Scheduler -{ -public: - explicit ProxyScheduler(MesosSchedulerDriverImpl* _impl) : impl(_impl) {} - - virtual ~ProxyScheduler() {} - - virtual void registered(SchedulerDriver* driver, - const FrameworkID& frameworkId, - const MasterInfo& masterInfo); - virtual void reregistered(SchedulerDriver* driver, - const MasterInfo& masterInfo); - virtual void disconnected(SchedulerDriver* driver); - virtual void resourceOffers(SchedulerDriver* driver, - const std::vector<Offer>& offers); - virtual void offerRescinded(SchedulerDriver* driver, const OfferID& offerId); - virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status); - virtual void frameworkMessage(SchedulerDriver* driver, - const ExecutorID& executorId, - const SlaveID& slaveId, - const std::string& data); - virtual void slaveLost(SchedulerDriver* driver, const SlaveID& slaveId); - virtual void executorLost(SchedulerDriver* driver, - const ExecutorID& executorId, - const SlaveID& slaveId, - int status); - virtual void error(SchedulerDriver* driver, const std::string& message); - -private: - MesosSchedulerDriverImpl* impl; -}; - -} // namespace python { -} // namespace mesos { - -#endif // PROXY_SCHEDULER_HPP http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/setup.py.in ---------------------------------------------------------------------- diff --git a/src/python/native/setup.py.in b/src/python/native/setup.py.in new file mode 100644 index 0000000..9fc9ad2 --- /dev/null +++ b/src/python/native/setup.py.in @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ext_modules + +config = { + 'name': 'mesos.native', + 'version': '@PACKAGE_VERSION@', + 'description': 'Mesos native driver implementation', + 'author': 'Apache Mesos', + 'author_email': '[email protected]', + 'url': 'http://pypi.python.org/pypi/mesos.native', + 'namespace_packages': [ 'mesos' ], + 'packages': [ 'mesos', 'mesos.native' ], + 'package_dir': { '': 'src' }, + 'install_requires': [ 'mesos.interface == @PACKAGE_VERSION@' ], + 'license': 'Apache 2.0', + 'keywords': 'mesos', + 'classifiers': [ ], + 'ext_modules': [ ext_modules.mesos_module ] +} + +from setuptools import setup + +setup(**config) http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/src/mesos/__init__.py ---------------------------------------------------------------------- diff --git a/src/python/native/src/mesos/__init__.py b/src/python/native/src/mesos/__init__.py new file mode 100644 index 0000000..f48ad10 --- /dev/null +++ b/src/python/native/src/mesos/__init__.py @@ -0,0 +1,6 @@ +# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages +try: + __import__('pkg_resources').declare_namespace(__name__) +except ImportError: + from pkgutil import extend_path + __path__ = extend_path(__path__, __name__) http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/src/mesos/native/__init__.py ---------------------------------------------------------------------- diff --git a/src/python/native/src/mesos/native/__init__.py b/src/python/native/src/mesos/native/__init__.py new file mode 100644 index 0000000..226f943 --- /dev/null +++ b/src/python/native/src/mesos/native/__init__.py @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._mesos import MesosExecutorDriverImpl +from ._mesos import MesosSchedulerDriverImpl + +MesosExecutorDriver = MesosExecutorDriverImpl +MesosSchedulerDriver = MesosSchedulerDriverImpl http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/src/mesos/native/mesos_executor_driver_impl.cpp ---------------------------------------------------------------------- diff --git a/src/python/native/src/mesos/native/mesos_executor_driver_impl.cpp b/src/python/native/src/mesos/native/mesos_executor_driver_impl.cpp new file mode 100644 index 0000000..16b9bc1 --- /dev/null +++ b/src/python/native/src/mesos/native/mesos_executor_driver_impl.cpp @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Python.h must be included before standard headers. +// See: http://docs.python.org/2/c-api/intro.html#include-files +#include <Python.h> + +#include <string> + +#include "mesos_executor_driver_impl.hpp" +#include "module.hpp" +#include "proxy_executor.hpp" + +using namespace mesos; +using namespace mesos::python; + +using std::cerr; +using std::endl; +using std::string; +using std::vector; +using std::map; + + +namespace mesos { namespace python { + +/** + * Python type object for MesosExecutorDriverImpl. + */ +PyTypeObject MesosExecutorDriverImplType = { + PyObject_HEAD_INIT(NULL) + 0, /* ob_size */ + "_mesos.MesosExecutorDriverImpl", /* tp_name */ + sizeof(MesosExecutorDriverImpl), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor) MesosExecutorDriverImpl_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /* tp_flags */ + "Private MesosExecutorDriver implementation", /* tp_doc */ + (traverseproc) MesosExecutorDriverImpl_traverse, /* tp_traverse */ + (inquiry) MesosExecutorDriverImpl_clear, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + MesosExecutorDriverImpl_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc) MesosExecutorDriverImpl_init, /* tp_init */ + 0, /* tp_alloc */ + MesosExecutorDriverImpl_new, /* tp_new */ +}; + + +/** + * List of Python methods in MesosExecutorDriverImpl. + */ +PyMethodDef MesosExecutorDriverImpl_methods[] = { + { "start", + (PyCFunction) MesosExecutorDriverImpl_start, + METH_NOARGS, + "Start the driver to connect to Mesos" + }, + { "stop", + (PyCFunction) MesosExecutorDriverImpl_stop, + METH_NOARGS, + "Stop the driver, disconnecting from Mesos" + }, + { "abort", + (PyCFunction) MesosExecutorDriverImpl_abort, + METH_NOARGS, + "Abort the driver, disallowing calls from and to the driver" + }, + { "join", + (PyCFunction) MesosExecutorDriverImpl_join, + METH_NOARGS, + "Wait for a running driver to disconnect from Mesos" + }, + { "run", + (PyCFunction) MesosExecutorDriverImpl_run, + METH_NOARGS, + "Start a driver and run it, returning when it disconnects from Mesos" + }, + { "sendStatusUpdate", + (PyCFunction) MesosExecutorDriverImpl_sendStatusUpdate, + METH_VARARGS, + "Send a status update for a task" + }, + { "sendFrameworkMessage", + (PyCFunction) MesosExecutorDriverImpl_sendFrameworkMessage, + METH_VARARGS, + "Send a FrameworkMessage to a slave" + }, + { NULL } /* Sentinel */ +}; + + +/** + * Create, but don't initialize, a new MesosExecutorDriverImpl + * (called by Python before init method). + */ +PyObject* MesosExecutorDriverImpl_new(PyTypeObject *type, + PyObject *args, + PyObject *kwds) +{ + MesosExecutorDriverImpl *self; + self = (MesosExecutorDriverImpl *) type->tp_alloc(type, 0); + if (self != NULL) { + self->driver = NULL; + self->proxyExecutor = NULL; + self->pythonExecutor = NULL; + } + return (PyObject*) self; +} + + +/** + * Initialize a MesosExecutorDriverImpl with constructor arguments. + */ +int MesosExecutorDriverImpl_init(MesosExecutorDriverImpl *self, + PyObject *args, + PyObject *kwds) +{ + PyObject *pythonExecutor = NULL; + + if (!PyArg_ParseTuple(args, "O", &pythonExecutor)) { + return -1; + } + + if (pythonExecutor != NULL) { + PyObject* tmp = self->pythonExecutor; + Py_INCREF(pythonExecutor); + self->pythonExecutor = pythonExecutor; + Py_XDECREF(tmp); + } + + if (self->driver != NULL) { + delete self->driver; + self->driver = NULL; + } + + if (self->proxyExecutor != NULL) { + delete self->proxyExecutor; + self->proxyExecutor = NULL; + } + + self->proxyExecutor = new ProxyExecutor(self); + self->driver = new MesosExecutorDriver(self->proxyExecutor); + + return 0; +} + + +/** + * Free a MesosExecutorDriverImpl. + */ +void MesosExecutorDriverImpl_dealloc(MesosExecutorDriverImpl* self) +{ + if (self->driver != NULL) { + // We need to wrap the driver destructor in an "allow threads" + // macro since the MesosExecutorDriver destructor waits for the + // ExecutorProcess to terminate and there might be a thread that + // is trying to acquire the GIL to call through the + // ProxyExecutor. It will only be after this thread executes that + // the ExecutorProcess might actually get a terminate. + Py_BEGIN_ALLOW_THREADS + delete self->driver; + Py_END_ALLOW_THREADS + self->driver = NULL; + } + + if (self->proxyExecutor != NULL) { + delete self->proxyExecutor; + self->proxyExecutor = NULL; + } + + MesosExecutorDriverImpl_clear(self); + self->ob_type->tp_free((PyObject*) self); +} + + +/** + * Traverse fields of a MesosExecutorDriverImpl on a cyclic GC search. + * See http://docs.python.org/extending/newtypes.html. + */ +int MesosExecutorDriverImpl_traverse(MesosExecutorDriverImpl* self, + visitproc visit, + void* arg) +{ + Py_VISIT(self->pythonExecutor); + return 0; +} + + +/** + * Clear fields of a MesosExecutorDriverImpl that can participate in + * GC cycles. See http://docs.python.org/extending/newtypes.html. + */ +int MesosExecutorDriverImpl_clear(MesosExecutorDriverImpl* self) +{ + Py_CLEAR(self->pythonExecutor); + return 0; +} + + +PyObject* MesosExecutorDriverImpl_start(MesosExecutorDriverImpl* self) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL"); + return NULL; + } + + Status status = self->driver->start(); + return PyInt_FromLong(status); // Sets an exception if creating the int fails +} + + +PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL"); + return NULL; + } + + Status status = self->driver->stop(); + return PyInt_FromLong(status); // Sets an exception if creating the int fails +} + + +PyObject* MesosExecutorDriverImpl_abort(MesosExecutorDriverImpl* self) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL"); + return NULL; + } + + Status status = self->driver->abort(); + return PyInt_FromLong(status); // Sets an exception if creating the int fails +} + + +PyObject* MesosExecutorDriverImpl_join(MesosExecutorDriverImpl* self) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL"); + return NULL; + } + + Status status; + Py_BEGIN_ALLOW_THREADS + status = self->driver->join(); + Py_END_ALLOW_THREADS + return PyInt_FromLong(status); // Sets an exception if creating the int fails +} + + +PyObject* MesosExecutorDriverImpl_run(MesosExecutorDriverImpl* self) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL"); + return NULL; + } + + Status status; + Py_BEGIN_ALLOW_THREADS + status = self->driver->run(); + Py_END_ALLOW_THREADS + return PyInt_FromLong(status); // Sets an exception if creating the int fails +} + + +PyObject* MesosExecutorDriverImpl_sendStatusUpdate( + MesosExecutorDriverImpl* self, + PyObject* args) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL"); + return NULL; + } + + PyObject* statusObj = NULL; + TaskStatus taskStatus; + if (!PyArg_ParseTuple(args, "O", &statusObj)) { + return NULL; + } + if (!readPythonProtobuf(statusObj, &taskStatus)) { + PyErr_Format(PyExc_Exception, + "Could not deserialize Python TaskStatus"); + return NULL; + } + + Status status = self->driver->sendStatusUpdate(taskStatus); + return PyInt_FromLong(status); // Sets an exception if creating the int fails +} + + +PyObject* MesosExecutorDriverImpl_sendFrameworkMessage( + MesosExecutorDriverImpl* self, + PyObject* args) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL"); + return NULL; + } + + const char* data; + int length; + if (!PyArg_ParseTuple(args, "s#", &data, &length)) { + return NULL; + } + + Status status = self->driver->sendFrameworkMessage(string(data, length)); + return PyInt_FromLong(status); // Sets an exception if creating the int fails +} + +} // namespace python { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/src/mesos/native/mesos_executor_driver_impl.hpp ---------------------------------------------------------------------- diff --git a/src/python/native/src/mesos/native/mesos_executor_driver_impl.hpp b/src/python/native/src/mesos/native/mesos_executor_driver_impl.hpp new file mode 100644 index 0000000..7245414 --- /dev/null +++ b/src/python/native/src/mesos/native/mesos_executor_driver_impl.hpp @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MESOS_EXECUTOR_DRIVER_IMPL_HPP +#define MESOS_EXECUTOR_DRIVER_IMPL_HPP + +#include <mesos/executor.hpp> + + +namespace mesos { namespace python { + +class ProxyExecutor; + +/** + * Python object structure for MesosExecutorDriverImpl objects. + */ +struct MesosExecutorDriverImpl { + PyObject_HEAD + /* Type-specific fields go here. */ + MesosExecutorDriver* driver; + ProxyExecutor* proxyExecutor; + PyObject* pythonExecutor; +}; + +/** + * Python type object for MesosExecutorDriverImpl. + */ +extern PyTypeObject MesosExecutorDriverImplType; + +/** + * List of Python methods in MesosExecutorDriverImpl. + */ +extern PyMethodDef MesosExecutorDriverImpl_methods[]; + +/** + * Create, but don't initialize, a new MesosExecutorDriverImpl + * (called by Python before init method). + */ +PyObject* MesosExecutorDriverImpl_new(PyTypeObject *type, + PyObject *args, + PyObject *kwds); + +/** + * Initialize a MesosExecutorDriverImpl with constructor arguments. + */ +int MesosExecutorDriverImpl_init(MesosExecutorDriverImpl *self, + PyObject *args, + PyObject *kwds); + +/** + * Free a MesosExecutorDriverImpl. + */ +void MesosExecutorDriverImpl_dealloc(MesosExecutorDriverImpl* self); + +/** + * Traverse fields of a MesosExecutorDriverImpl on a cyclic GC search. + * See http://docs.python.org/extending/newtypes.html. + */ +int MesosExecutorDriverImpl_traverse(MesosExecutorDriverImpl* self, + visitproc visit, + void* arg); +/** + * Clear fields of a MesosExecutorDriverImpl that can participate in + * GC cycles. See http://docs.python.org/extending/newtypes.html. + */ +int MesosExecutorDriverImpl_clear(MesosExecutorDriverImpl* self); + +// MesosExecutorDriverImpl methods +PyObject* MesosExecutorDriverImpl_start(MesosExecutorDriverImpl* self); + +PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self); + +PyObject* MesosExecutorDriverImpl_abort(MesosExecutorDriverImpl* self); + +PyObject* MesosExecutorDriverImpl_join(MesosExecutorDriverImpl* self); + +PyObject* MesosExecutorDriverImpl_run(MesosExecutorDriverImpl* self); + +PyObject* MesosExecutorDriverImpl_sendStatusUpdate( + MesosExecutorDriverImpl* self, + PyObject* args); + +PyObject* MesosExecutorDriverImpl_sendFrameworkMessage( + MesosExecutorDriverImpl* self, + PyObject* args); + +} // namespace python { +} // namespace mesos { + +#endif /* MESOS_EXECUTOR_DRIVER_IMPL_HPP */ http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp ---------------------------------------------------------------------- diff --git a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp new file mode 100644 index 0000000..e014eed --- /dev/null +++ b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp @@ -0,0 +1,634 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Python.h must be included before standard headers. +// See: http://docs.python.org/2/c-api/intro.html#include-files +#include <Python.h> + +#include <string> + +#include "mesos_scheduler_driver_impl.hpp" +#include "module.hpp" +#include "proxy_scheduler.hpp" + +using namespace mesos; +using namespace mesos::python; + +using std::cerr; +using std::endl; +using std::string; +using std::vector; +using std::map; + +namespace mesos { +namespace python { + +/** + * Python type object for MesosSchedulerDriverImpl. + */ +PyTypeObject MesosSchedulerDriverImplType = { + PyObject_HEAD_INIT(NULL) + 0, /* ob_size */ + "_mesos.MesosSchedulerDriverImpl", /* tp_name */ + sizeof(MesosSchedulerDriverImpl), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor) MesosSchedulerDriverImpl_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /* tp_flags */ + "Private MesosSchedulerDriver implementation", /* tp_doc */ + (traverseproc) MesosSchedulerDriverImpl_traverse, /* tp_traverse */ + (inquiry) MesosSchedulerDriverImpl_clear, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + MesosSchedulerDriverImpl_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc) MesosSchedulerDriverImpl_init, /* tp_init */ + 0, /* tp_alloc */ + MesosSchedulerDriverImpl_new, /* tp_new */ +}; + + +/** + * List of Python methods in MesosSchedulerDriverImpl. + */ +PyMethodDef MesosSchedulerDriverImpl_methods[] = { + { "start", + (PyCFunction) MesosSchedulerDriverImpl_start, + METH_NOARGS, + "Start the driver to connect to Mesos" + }, + { "stop", + (PyCFunction) MesosSchedulerDriverImpl_stop, + METH_VARARGS, + "Stop the driver, disconnecting from Mesos" + }, + { "abort", + (PyCFunction) MesosSchedulerDriverImpl_abort, + METH_NOARGS, + "Abort the driver, disabling calls from and to the driver" + }, + { "join", + (PyCFunction) MesosSchedulerDriverImpl_join, + METH_NOARGS, + "Wait for a running driver to disconnect from Mesos" + }, + { "run", + (PyCFunction) MesosSchedulerDriverImpl_run, + METH_NOARGS, + "Start a driver and run it, returning when it disconnects from Mesos" + }, + { "requestResources", + (PyCFunction) MesosSchedulerDriverImpl_requestResources, + METH_VARARGS, + "Request resources from the Mesos allocator" + }, + { "launchTasks", + (PyCFunction) MesosSchedulerDriverImpl_launchTasks, + METH_VARARGS, + "Reply to a Mesos offer with a list of tasks" + }, + { "killTask", + (PyCFunction) MesosSchedulerDriverImpl_killTask, + METH_VARARGS, + "Kill the task with the given ID" + }, + { "declineOffer", + (PyCFunction) MesosSchedulerDriverImpl_declineOffer, + METH_VARARGS, + "Decline a Mesos offer" + }, + { "reviveOffers", + (PyCFunction) MesosSchedulerDriverImpl_reviveOffers, + METH_NOARGS, + "Remove all filters and ask Mesos for new offers" + }, + { "sendFrameworkMessage", + (PyCFunction) MesosSchedulerDriverImpl_sendFrameworkMessage, + METH_VARARGS, + "Send a FrameworkMessage to a slave" + }, + { "reconcileTasks", + (PyCFunction) MesosSchedulerDriverImpl_reconcileTasks, + METH_VARARGS, + "Master sends status updates if task status is different from expected" + }, + { NULL } /* Sentinel */ +}; + + +/** + * Create, but don't initialize, a new MesosSchedulerDriverImpl + * (called by Python before init method). + */ +PyObject* MesosSchedulerDriverImpl_new(PyTypeObject* type, + PyObject* args, + PyObject* kwds) +{ + MesosSchedulerDriverImpl* self; + self = (MesosSchedulerDriverImpl*) type->tp_alloc(type, 0); + if (self != NULL) { + self->driver = NULL; + self->proxyScheduler = NULL; + self->pythonScheduler = NULL; + } + return (PyObject*) self; +} + + +/** + * Initialize a MesosSchedulerDriverImpl with constructor arguments. + */ +int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self, + PyObject* args, + PyObject* kwds) +{ + PyObject* schedulerObj = NULL; + PyObject* frameworkObj = NULL; + const char* master; + PyObject* credentialObj = NULL; + + if (!PyArg_ParseTuple( + args, "OOs|O", &schedulerObj, &frameworkObj, &master, &credentialObj)) { + return -1; + } + + if (schedulerObj != NULL) { + PyObject* tmp = self->pythonScheduler; + Py_INCREF(schedulerObj); + self->pythonScheduler = schedulerObj; + Py_XDECREF(tmp); + } + + FrameworkInfo framework; + if (frameworkObj != NULL) { + if (!readPythonProtobuf(frameworkObj, &framework)) { + PyErr_Format(PyExc_Exception, + "Could not deserialize Python FrameworkInfo"); + return -1; + } + } + + Credential credential; + if (credentialObj != NULL) { + if (!readPythonProtobuf(credentialObj, &credential)) { + PyErr_Format(PyExc_Exception, "Could not deserialize Python Credential"); + return -1; + } + } + + + if (self->driver != NULL) { + delete self->driver; + self->driver = NULL; + } + + if (self->proxyScheduler != NULL) { + delete self->proxyScheduler; + self->proxyScheduler = NULL; + } + + self->proxyScheduler = new ProxyScheduler(self); + + if (credentialObj != NULL) { + self->driver = new MesosSchedulerDriver( + self->proxyScheduler, framework, master, credential); + } else { + self->driver = new MesosSchedulerDriver( + self->proxyScheduler, framework, master); + } + + return 0; +} + + +/** + * Free a MesosSchedulerDriverImpl. + */ +void MesosSchedulerDriverImpl_dealloc(MesosSchedulerDriverImpl* self) +{ + if (self->driver != NULL) { + // We need to wrap the driver destructor in an "allow threads" + // macro since the MesosSchedulerDriver destructor waits for the + // SchedulerProcess to terminate and there might be a thread that + // is trying to acquire the GIL to call through the + // ProxyScheduler. It will only be after this thread executes that + // the SchedulerProcess might actually get a terminate. + Py_BEGIN_ALLOW_THREADS + delete self->driver; + Py_END_ALLOW_THREADS + self->driver = NULL; + } + + if (self->proxyScheduler != NULL) { + delete self->proxyScheduler; + self->proxyScheduler = NULL; + } + + MesosSchedulerDriverImpl_clear(self); + self->ob_type->tp_free((PyObject*) self); +} + + +/** + * Traverse fields of a MesosSchedulerDriverImpl on a cyclic GC search. + * See http://docs.python.org/extending/newtypes.html. + */ +int MesosSchedulerDriverImpl_traverse(MesosSchedulerDriverImpl* self, + visitproc visit, + void* arg) +{ + Py_VISIT(self->pythonScheduler); + return 0; +} + + +/** + * Clear fields of a MesosSchedulerDriverImpl that can participate in + * GC cycles. See http://docs.python.org/extending/newtypes.html. + */ +int MesosSchedulerDriverImpl_clear(MesosSchedulerDriverImpl* self) +{ + Py_CLEAR(self->pythonScheduler); + return 0; +} + + +PyObject* MesosSchedulerDriverImpl_start(MesosSchedulerDriverImpl* self) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + Status status = self->driver->start(); + return PyInt_FromLong(status); // Sets exception if creating long fails. +} + + +PyObject* MesosSchedulerDriverImpl_stop(MesosSchedulerDriverImpl* self, + PyObject* args) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + bool failover = false; // Should match default in mesos.py. + + if (!PyArg_ParseTuple(args, "|b", &failover)) { + return NULL; + } + + Status status = self->driver->stop(failover); + return PyInt_FromLong(status); // Sets exception if creating long fails. +} + + +PyObject* MesosSchedulerDriverImpl_abort(MesosSchedulerDriverImpl* self) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + Status status = self->driver->abort(); + return PyInt_FromLong(status); // Sets exception if creating long fails. +} + + +PyObject* MesosSchedulerDriverImpl_join(MesosSchedulerDriverImpl* self) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + Status status; + Py_BEGIN_ALLOW_THREADS + status = self->driver->join(); + Py_END_ALLOW_THREADS + return PyInt_FromLong(status); // Sets exception if creating long fails. +} + + +PyObject* MesosSchedulerDriverImpl_run(MesosSchedulerDriverImpl* self) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + Status status; + Py_BEGIN_ALLOW_THREADS + status = self->driver->run(); + Py_END_ALLOW_THREADS + return PyInt_FromLong(status); // Sets exception if creating long fails. +} + + +PyObject* MesosSchedulerDriverImpl_requestResources( + MesosSchedulerDriverImpl* self, + PyObject* args) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + PyObject* requestsObj = NULL; + vector<Request> requests; + + if (!PyArg_ParseTuple(args, "O", &requestsObj)) { + return NULL; + } + + if (!PyList_Check(requestsObj)) { + PyErr_Format(PyExc_Exception, + "Parameter 2 to requestsResources is not a list"); + return NULL; + } + Py_ssize_t len = PyList_Size(requestsObj); + for (int i = 0; i < len; i++) { + PyObject* requestObj = PyList_GetItem(requestsObj, i); + if (requestObj == NULL) { + return NULL; // Exception will have been set by PyList_GetItem + } + Request request; + if (!readPythonProtobuf(requestObj, &request)) { + PyErr_Format(PyExc_Exception, "Could not deserialize Python Request"); + return NULL; + } + requests.push_back(request); + } + + Status status = self->driver->requestResources(requests); + return PyInt_FromLong(status); // Sets exception if creating long fails. +} + + +PyObject* MesosSchedulerDriverImpl_launchTasks(MesosSchedulerDriverImpl* self, + PyObject* args) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + PyObject* offerIdsObj = NULL; + PyObject* tasksObj = NULL; + PyObject* filtersObj = NULL; + vector<OfferID> offerIds; + vector<TaskInfo> tasks; + Filters filters; + + if (!PyArg_ParseTuple(args, "OO|O", &offerIdsObj, &tasksObj, &filtersObj)) { + return NULL; + } + + // Offer argument can be a list of offer ids or a single offer id (for + // backward compatibility). + if (!PyList_Check(offerIdsObj)) { + OfferID offerId; + if (!readPythonProtobuf(offerIdsObj, &offerId)) { + PyErr_Format(PyExc_Exception, "Could not deserialize Python OfferID"); + return NULL; + } + offerIds.push_back(offerId); + } else { + Py_ssize_t len = PyList_Size(offerIdsObj); + for (int i = 0; i < len; i++) { + PyObject* offerObj = PyList_GetItem(offerIdsObj, i); + if (offerObj == NULL) { + return NULL; + } + OfferID offerId; + if (!readPythonProtobuf(offerObj, &offerId)) { + PyErr_Format(PyExc_Exception, + "Could not deserialize Python OfferID"); + return NULL; + } + offerIds.push_back(offerId); + } + } + + if (!PyList_Check(tasksObj)) { + PyErr_Format(PyExc_Exception, "Parameter 2 to launchTasks is not a list"); + return NULL; + } + Py_ssize_t len = PyList_Size(tasksObj); + for (int i = 0; i < len; i++) { + PyObject* taskObj = PyList_GetItem(tasksObj, i); + if (taskObj == NULL) { + return NULL; // Exception will have been set by PyList_GetItem + } + TaskInfo task; + if (!readPythonProtobuf(taskObj, &task)) { + PyErr_Format(PyExc_Exception, + "Could not deserialize Python TaskInfo"); + return NULL; + } + tasks.push_back(task); + } + + if (filtersObj != NULL) { + if (!readPythonProtobuf(filtersObj, &filters)) { + PyErr_Format(PyExc_Exception, + "Could not deserialize Python Filters"); + return NULL; + } + } + + Status status = self->driver->launchTasks(offerIds, tasks, filters); + return PyInt_FromLong(status); // Sets exception if creating long fails. +} + + +PyObject* MesosSchedulerDriverImpl_killTask(MesosSchedulerDriverImpl* self, + PyObject* args) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + PyObject* tidObj = NULL; + TaskID tid; + if (!PyArg_ParseTuple(args, "O", &tidObj)) { + return NULL; + } + if (!readPythonProtobuf(tidObj, &tid)) { + PyErr_Format(PyExc_Exception, "Could not deserialize Python TaskID"); + return NULL; + } + + Status status = self->driver->killTask(tid); + return PyInt_FromLong(status); // Sets exception if creating long fails. +} + + +PyObject* MesosSchedulerDriverImpl_declineOffer(MesosSchedulerDriverImpl* self, + PyObject* args) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + PyObject* offerIdObj = NULL; + PyObject* filtersObj = NULL; + OfferID offerId; + Filters filters; + + if (!PyArg_ParseTuple(args, "O|O", &offerIdObj, &filtersObj)) { + return NULL; + } + + if (!readPythonProtobuf(offerIdObj, &offerId)) { + PyErr_Format(PyExc_Exception, "Could not deserialize Python OfferID"); + return NULL; + } + + if (filtersObj != NULL) { + if (!readPythonProtobuf(filtersObj, &filters)) { + PyErr_Format(PyExc_Exception, + "Could not deserialize Python Filters"); + return NULL; + } + } + + Status status = self->driver->declineOffer(offerId, filters); + return PyInt_FromLong(status); // Sets exception if creating long fails. +} + + +PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + Status status = self->driver->reviveOffers(); + return PyInt_FromLong(status); // Sets exception if creating long fails. +} + + +PyObject* MesosSchedulerDriverImpl_sendFrameworkMessage( + MesosSchedulerDriverImpl* self, + PyObject* args) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + PyObject* slaveIdObj = NULL; + PyObject* executorIdObj = NULL; + SlaveID slaveId; + ExecutorID executorId; + const char* data; + int length; + + if (!PyArg_ParseTuple( + args, "OOs#", &executorIdObj, &slaveIdObj, &data, &length)) { + return NULL; + } + + if (!readPythonProtobuf(executorIdObj, &executorId)) { + PyErr_Format(PyExc_Exception, "Could not deserialize Python ExecutorID"); + return NULL; + } + + if (!readPythonProtobuf(slaveIdObj, &slaveId)) { + PyErr_Format(PyExc_Exception, "Could not deserialize Python SlaveID"); + return NULL; + } + + Status status = self->driver->sendFrameworkMessage( + executorId, slaveId, string(data, length)); + + return PyInt_FromLong(status); // Sets exception if creating long fails. +} + + +PyObject* MesosSchedulerDriverImpl_reconcileTasks( + MesosSchedulerDriverImpl* self, + PyObject* args) +{ + if (self->driver == NULL) { + PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL"); + return NULL; + } + + PyObject* statusesObj = NULL; + vector<TaskStatus> statuses; + + if (!PyArg_ParseTuple(args, "O", &statusesObj)) { + return NULL; + } + + if (!PyList_Check(statusesObj)) { + PyErr_Format(PyExc_Exception, + "Parameter 1 to reconcileTasks is not a list"); + + return NULL; + } + + Py_ssize_t len = PyList_Size(statusesObj); + for (int i = 0; i < len; i++) { + PyObject* statusObj = PyList_GetItem(statusesObj, i); + if (statusObj == NULL) { + return NULL; + } + + TaskStatus status; + if (!readPythonProtobuf(statusObj, &status)) { + PyErr_Format(PyExc_Exception, + "Could not deserialize Python TaskStatus"); + return NULL; + } + statuses.push_back(status); + } + + Status status = self->driver->reconcileTasks(statuses); + return PyInt_FromLong(status); +} + +} // namespace python { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp ---------------------------------------------------------------------- diff --git a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp new file mode 100644 index 0000000..8c285ae --- /dev/null +++ b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MESOS_SCHEDULER_DRIVER_IMPL_HPP +#define MESOS_SCHEDULER_DRIVER_IMPL_HPP + +#include <mesos/scheduler.hpp> + + +namespace mesos { namespace python { + +class ProxyScheduler; + +/** + * Python object structure for MesosSchedulerDriverImpl objects. + */ +struct MesosSchedulerDriverImpl { + PyObject_HEAD + /* Type-specific fields go here. */ + MesosSchedulerDriver* driver; + ProxyScheduler* proxyScheduler; + PyObject* pythonScheduler; +}; + +/** + * Python type object for MesosSchedulerDriverImpl. + */ +extern PyTypeObject MesosSchedulerDriverImplType; + +/** + * List of Python methods in MesosSchedulerDriverImpl. + */ +extern PyMethodDef MesosSchedulerDriverImpl_methods[]; + +/** + * Create, but don't initialize, a new MesosSchedulerDriverImpl + * (called by Python before init method). + */ +PyObject* MesosSchedulerDriverImpl_new(PyTypeObject *type, + PyObject *args, + PyObject *kwds); + +/** + * Initialize a MesosSchedulerDriverImpl with constructor arguments. + */ +int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl *self, + PyObject *args, + PyObject *kwds); + +/** + * Free a MesosSchedulerDriverImpl. + */ +void MesosSchedulerDriverImpl_dealloc(MesosSchedulerDriverImpl* self); + +/** + * Traverse fields of a MesosSchedulerDriverImpl on a cyclic GC search. + * See http://docs.python.org/extending/newtypes.html. + */ +int MesosSchedulerDriverImpl_traverse(MesosSchedulerDriverImpl* self, + visitproc visit, + void* arg); +/** + * Clear fields of a MesosSchedulerDriverImpl that can participate in + * GC cycles. See http://docs.python.org/extending/newtypes.html. + */ +int MesosSchedulerDriverImpl_clear(MesosSchedulerDriverImpl* self); + +// MesosSchedulerDriverImpl methods +PyObject* MesosSchedulerDriverImpl_start(MesosSchedulerDriverImpl* self); + +PyObject* MesosSchedulerDriverImpl_stop( + MesosSchedulerDriverImpl* self, + PyObject* args); + +PyObject* MesosSchedulerDriverImpl_abort(MesosSchedulerDriverImpl* self); + +PyObject* MesosSchedulerDriverImpl_join(MesosSchedulerDriverImpl* self); + +PyObject* MesosSchedulerDriverImpl_run(MesosSchedulerDriverImpl* self); + +PyObject* MesosSchedulerDriverImpl_requestResources( + MesosSchedulerDriverImpl* self, + PyObject* args); + +PyObject* MesosSchedulerDriverImpl_launchTasks( + MesosSchedulerDriverImpl* self, + PyObject* args); + +PyObject* MesosSchedulerDriverImpl_killTask( + MesosSchedulerDriverImpl* self, + PyObject* args); + +PyObject* MesosSchedulerDriverImpl_declineOffer( + MesosSchedulerDriverImpl* self, + PyObject* args); + +PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self); + +PyObject* MesosSchedulerDriverImpl_sendFrameworkMessage( + MesosSchedulerDriverImpl* self, + PyObject* args); + +PyObject* MesosSchedulerDriverImpl_reconcileTasks( + MesosSchedulerDriverImpl* self, + PyObject* args); + +} // namespace python { +} // namespace mesos { + +#endif /* MESOS_SCHEDULER_DRIVER_IMPL_HPP */ http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/src/mesos/native/module.cpp ---------------------------------------------------------------------- diff --git a/src/python/native/src/mesos/native/module.cpp b/src/python/native/src/mesos/native/module.cpp new file mode 100644 index 0000000..6d0de5f --- /dev/null +++ b/src/python/native/src/mesos/native/module.cpp @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This file defines the _mesos.so binary module used by the Mesos Python API. + * This module contains private implementations of MesosSchedulerDriver and + * MesosExecutorDriver as Python types that get called from the public module + * called mesos (in <root>/src/python/src/mesos.py). This design was chosen + * so that most of the API (e.g. the Scheduler and Executor interfaces) can + * be written in Python, and only the parts that need to call into C++ are + * in C++. Note that the mesos module also contains public classes called + * MesosSchedulerDriver and MesosExecutorDriver. These call into the private + * _mesos.MesosSchedulerDriverImpl and _mesos.MesosExecutorDriverImpl. + */ + +// Python.h must be included before standard headers. +// See: http://docs.python.org/2/c-api/intro.html#include-files +#include <Python.h> + +#include <iostream> + +#include <mesos/executor.hpp> +#include <mesos/scheduler.hpp> + +#include "module.hpp" +#include "proxy_scheduler.hpp" +#include "mesos_scheduler_driver_impl.hpp" +#include "proxy_executor.hpp" +#include "mesos_executor_driver_impl.hpp" + +using namespace mesos; +using namespace mesos::python; + +using std::string; +using std::vector; +using std::map; + + +/** + * The Python module object for mesos_pb2 (which contains the protobuf + * classes generated for Python). + */ +PyObject* mesos::python::mesos_pb2 = NULL; + + +namespace { + +/** + * Method list for our Python module. + */ +PyMethodDef MODULE_METHODS[] = { + {NULL, NULL, 0, NULL} /* Sentinel */ +}; + +} // namespace { + + +/** + * Entry point called by Python to initialize our module. + */ +PyMODINIT_FUNC init_mesos(void) +{ + // Ensure that the interpreter's threading support is enabled + PyEval_InitThreads(); + + // Import the mesos_pb2 module (on which we depend for protobuf classes) + mesos_pb2 = PyImport_ImportModule("mesos.interface.mesos_pb2"); + if (mesos_pb2 == NULL) + return; + + // Initialize our Python types + if (PyType_Ready(&MesosSchedulerDriverImplType) < 0) + return; + if (PyType_Ready(&MesosExecutorDriverImplType) < 0) + return; + + // Create the _mesos module and add our types to it + PyObject* module = Py_InitModule("_mesos", MODULE_METHODS); + Py_INCREF(&MesosSchedulerDriverImplType); + PyModule_AddObject(module, + "MesosSchedulerDriverImpl", + (PyObject*) &MesosSchedulerDriverImplType); + Py_INCREF(&MesosExecutorDriverImplType); + PyModule_AddObject(module, + "MesosExecutorDriverImpl", + (PyObject*) &MesosExecutorDriverImplType); +} http://git-wip-us.apache.org/repos/asf/mesos/blob/c5a68be1/src/python/native/src/mesos/native/module.hpp ---------------------------------------------------------------------- diff --git a/src/python/native/src/mesos/native/module.hpp b/src/python/native/src/mesos/native/module.hpp new file mode 100644 index 0000000..1c35e2e --- /dev/null +++ b/src/python/native/src/mesos/native/module.hpp @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MODULE_HPP +#define MODULE_HPP + +// Python.h must be included before standard headers. +// See: http://docs.python.org/2/c-api/intro.html#include-files +#include <Python.h> + +#include <iostream> + +#include <google/protobuf/io/zero_copy_stream_impl.h> + + +namespace mesos { namespace python { + +/** + * The Python module object for mesos_pb2 (which contains the protobuf + * classes generated for Python). + */ +extern PyObject* mesos_pb2; + + +/** + * RAII utility class for acquiring the Python global interpreter lock. + */ +class InterpreterLock { + PyGILState_STATE state; + +public: + InterpreterLock() { + state = PyGILState_Ensure(); + } + + ~InterpreterLock() { + PyGILState_Release(state); + } +}; + + +/** + * Convert a Python protocol buffer object into a C++ one by serializing + * it to a string and deserializing the result back in C++. Returns true + * on success, or prints an error and returns false on failure. + */ +template <typename T> +bool readPythonProtobuf(PyObject* obj, T* t) +{ + if (obj == Py_None) { + std::cerr << "None object given where protobuf expected" << std::endl; + return false; + } + PyObject* res = PyObject_CallMethod(obj, + (char*) "SerializeToString", + (char*) NULL); + if (res == NULL) { + std::cerr << "Failed to call Python object's SerializeToString " + << "(perhaps it is not a protobuf?)" << std::endl; + PyErr_Print(); + return false; + } + char* chars; + Py_ssize_t len; + if (PyString_AsStringAndSize(res, &chars, &len) < 0) { + std::cerr << "SerializeToString did not return a string" << std::endl; + PyErr_Print(); + Py_DECREF(res); + return false; + } + google::protobuf::io::ArrayInputStream stream(chars, len); + bool success = t->ParseFromZeroCopyStream(&stream); + if (!success) { + std::cerr << "Could not deserialize protobuf as expected type" << std::endl; + } + Py_DECREF(res); + return success; +} + + +/** + * Convert a C++ protocol buffer object into a Python one by serializing + * it to a string and deserializing the result back in Python. Returns the + * resulting PyObject* on success or raises a Python exception and returns + * NULL on failure. + */ +template <typename T> +PyObject* createPythonProtobuf(const T& t, const char* typeName) +{ + PyObject* dict = PyModule_GetDict(mesos_pb2); + if (dict == NULL) { + PyErr_Format(PyExc_Exception, "PyModule_GetDict failed"); + return NULL; + } + + PyObject* type = PyDict_GetItemString(dict, typeName); + if (type == NULL) { + PyErr_Format(PyExc_Exception, "Could not resolve mesos_pb2.%s", typeName); + return NULL; + } + if (!PyType_Check(type)) { + PyErr_Format(PyExc_Exception, "mesos_pb2.%s is not a type", typeName); + return NULL; + } + + std::string str; + if (!t.SerializeToString(&str)) { + PyErr_Format(PyExc_Exception, "C++ %s SerializeToString failed", typeName); + return NULL; + } + + // Propagates any exception that might happen in FromString + return PyObject_CallMethod(type, + (char*) "FromString", + (char*) "s#", + str.data(), + str.size()); +} + +} // namespace python { +} // namespace mesos { + +#endif /* MODULE_HPP */
