Repository: mesos Updated Branches: refs/heads/master 99acafa5e -> 3b70d417a
Removed unnecessary Registry copying to improve performance. When the number of agents is large every `Registry` copy operation takes a lot of time (~0.4 sec with 55k agents), because it involves deep copying a big object tree. Because of that, the use of `protobuf::State` in `Registrar` incurs a dramatic performance cost from multiple protobuf copying. This patch drops the use of `protobuf::State` in `Registrar` in favor of "untyped" `State` and manual serialization/deserialization in order to minimize `Registry` copying and keep registry update timings at acceptable values. Performance improvements to `protobuf::State` should be explored in order to make it usable in the registrar without regressing on the performance of this approach. Review: https://reviews.apache.org/r/58355/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3b70d417 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3b70d417 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3b70d417 Branch: refs/heads/master Commit: 3b70d417a8642eeb0efb562d45cc0f7a7809f54f Parents: 99acafa Author: Ilya Pronin <[email protected]> Authored: Wed Apr 19 16:22:15 2017 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Apr 19 16:42:51 2017 -0700 ---------------------------------------------------------------------- src/master/main.cpp | 5 +- src/master/registrar.cpp | 127 +++++++++++++++++++++++++------------ src/master/registrar.hpp | 4 +- src/tests/cluster.cpp | 2 +- src/tests/cluster.hpp | 4 +- src/tests/mock_registrar.cpp | 4 +- src/tests/mock_registrar.hpp | 4 +- src/tests/registrar_tests.cpp | 4 +- 8 files changed, 98 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/master/main.cpp ---------------------------------------------------------------------- diff --git a/src/master/main.cpp b/src/master/main.cpp index 90d159e..95a482b 100644 --- a/src/master/main.cpp +++ b/src/master/main.cpp @@ -37,7 +37,7 @@ #ifndef __WINDOWS__ #include <mesos/state/log.hpp> #endif // __WINDOWS__ -#include <mesos/state/protobuf.hpp> +#include <mesos/state/state.hpp> #include <mesos/state/storage.hpp> #include <mesos/zookeeper/detector.hpp> @@ -392,8 +392,7 @@ int main(int argc, char** argv) CHECK_NOTNULL(storage); - mesos::state::protobuf::State* state = - new mesos::state::protobuf::State(storage); + mesos::state::State* state = new mesos::state::State(storage); Registrar* registrar = new Registrar(flags, state, READONLY_HTTP_AUTHENTICATION_REALM); http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/master/registrar.cpp ---------------------------------------------------------------------- diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp index 0029cc7..82ec3d9 100644 --- a/src/master/registrar.cpp +++ b/src/master/registrar.cpp @@ -19,7 +19,7 @@ #include <mesos/type_utils.hpp> -#include <mesos/state/protobuf.hpp> +#include <mesos/state/state.hpp> #include <process/defer.hpp> #include <process/dispatch.hpp> @@ -44,8 +44,8 @@ #include "master/registrar.hpp" #include "master/registry.hpp" -using mesos::state::protobuf::State; -using mesos::state::protobuf::Variable; +using mesos::state::State; +using mesos::state::Variable; using process::dispatch; using process::spawn; @@ -89,9 +89,9 @@ public: const Option<string>& _authenticationRealm) : ProcessBase(process::ID::generate("registrar")), metrics(*this), + state(_state), updating(false), flags(_flags), - state(_state), authenticationRealm(_authenticationRealm) {} virtual ~RegistrarProcess() {} @@ -108,19 +108,20 @@ protected: "/registry", authenticationRealm.get(), registryHelp(), - &RegistrarProcess::registry); + &RegistrarProcess::getRegistry); } else { route( "/registry", registryHelp(), - lambda::bind(&RegistrarProcess::registry, this, lambda::_1, None())); + lambda::bind( + &RegistrarProcess::getRegistry, this, lambda::_1, None())); } } private: // HTTP handlers. // /registrar(N)/registry - Future<Response> registry( + Future<Response> getRegistry( const Request& request, const Option<Principal>&); static string registryHelp(); @@ -186,8 +187,8 @@ private: Future<double> _registry_size_bytes() { - if (variable.isSome()) { - return variable.get().get().ByteSize(); + if (registry.isSome()) { + return registry->ByteSize(); } return Failure("Not recovered yet"); @@ -196,14 +197,15 @@ private: // Continuations. void _recover( const MasterInfo& info, - const Future<Variable<Registry>>& recovery); + const Future<Variable>& recovery); void __recover(const Future<bool>& recover); Future<bool> _apply(Owned<Operation> operation); // Helper for updating state (performing store). void update(); void _update( - const Future<Option<Variable<Registry>>>& store, + const Future<Option<Variable>>& store, + const Owned<Registry>& updatedRegistry, deque<Owned<Operation>> operations); // Fails all pending operations and transitions the Registrar @@ -212,12 +214,24 @@ private: // performing more State storage operations. void abort(const string& message); - Option<Variable<Registry>> variable; + // TODO(ipronin): We use the "untyped" `State` class here and perform + // the protobuf (de)serialization manually within the Registrar, because + // the use of `protobuf::State` incurs a dramatic peformance cost from + // protobuf copying. We should explore using `protobuf::State`, which will + // require move support and other copy elimination to maintain the + // performance of the current approach. + State* state; + + // Per the TODO above, we store both serialized and deserialized versions + // of the `Registry` protobuf. If we're able to move to `protobuf::State`, + // we could just store a single `protobuf::state::Variable<Registry>`. + Option<Variable> variable; + Option<Registry> registry; + deque<Owned<Operation>> operations; bool updating; // Used to signify fetching (recovering) or storing. const Flags flags; - State* state; // Used to compose our operations with recovery. Option<Owned<Promise<Registry>>> recovered; @@ -256,14 +270,14 @@ void fail(deque<Owned<Operation>>* operations, const string& message) } -Future<Response> RegistrarProcess::registry( +Future<Response> RegistrarProcess::getRegistry( const Request& request, const Option<Principal>&) { JSON::Object result; - if (variable.isSome()) { - result = JSON::protobuf(variable.get().get()); + if (registry.isSome()) { + result = JSON::protobuf(registry.get()); } return OK(result, request.url.query.get("jsonp")); @@ -331,10 +345,10 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info) VLOG(1) << "Recovering registrar"; metrics.state_fetch.start(); - state->fetch<Registry>("registry") + state->fetch("registry") .after(flags.registry_fetch_timeout, lambda::bind( - &timeout<Variable<Registry>>, + &timeout<Variable>, "fetch", flags.registry_fetch_timeout, lambda::_1)) @@ -349,7 +363,7 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info) void RegistrarProcess::_recover( const MasterInfo& info, - const Future<Variable<Registry>>& recovery) + const Future<Variable>& recovery) { updating = false; @@ -358,24 +372,38 @@ void RegistrarProcess::_recover( if (!recovery.isReady()) { recovered.get()->fail("Failed to recover registrar: " + (recovery.isFailed() ? recovery.failure() : "discarded")); - } else { - Duration elapsed = metrics.state_fetch.stop(); + return; + } - LOG(INFO) << "Successfully fetched the registry" - << " (" << Bytes(recovery.get().get().ByteSize()) << ")" - << " in " << elapsed; + // Deserialize the registry. + Try<Registry> deserialized = + ::protobuf::deserialize<Registry>(recovery->value()); + if (deserialized.isError()) { + recovered.get()->fail("Failed to recover registrar: " + + deserialized.error()); + return; + } - // Save the registry. - variable = recovery.get(); + Duration elapsed = metrics.state_fetch.stop(); - // Perform the Recover operation to add the new MasterInfo. - Owned<Operation> operation(new Recover(info)); - operations.push_back(operation); - operation->future() - .onAny(defer(self(), &Self::__recover, lambda::_1)); + LOG(INFO) << "Successfully fetched the registry" + << " (" << Bytes(deserialized->ByteSize()) << ")" + << " in " << elapsed; - update(); - } + // Save the registry. + variable = recovery.get(); + + // Workaround for immovable protobuf messages. + registry = Option<Registry>(Registry()); + registry->Swap(&deserialized.get()); + + // Perform the Recover operation to add the new MasterInfo. + Owned<Operation> operation(new Recover(info)); + operations.push_back(operation); + operation->future() + .onAny(defer(self(), &Self::__recover, lambda::_1)); + + update(); } @@ -397,7 +425,8 @@ void RegistrarProcess::__recover(const Future<bool>& recover) // the Registry with the latest MasterInfo. // Set the promise and un-gate any pending operations. CHECK_SOME(variable); - recovered.get()->set(variable.get().get()); + CHECK_SOME(registry); + recovered.get()->set(registry.get()); } } @@ -446,18 +475,19 @@ void RegistrarProcess::update() updating = true; - // Create a snapshot of the current registry. - Registry registry = variable.get().get(); + // Create a snapshot of the current registry. We use an `Owned` here + // to avoid copying, since protobuf doesn't suppport move construction. + auto updatedRegistry = Owned<Registry>(new Registry(registry.get())); // Create the 'slaveIDs' accumulator. hashset<SlaveID> slaveIDs; - foreach (const Registry::Slave& slave, registry.slaves().slaves()) { + foreach (const Registry::Slave& slave, updatedRegistry->slaves().slaves()) { slaveIDs.insert(slave.info().id()); } foreach (Owned<Operation>& operation, operations) { // No need to process the result of the operation. - (*operation)(®istry, &slaveIDs); + (*operation)(updatedRegistry.get(), &slaveIDs); } LOG(INFO) << "Applied " << operations.size() << " operations in " @@ -465,14 +495,25 @@ void RegistrarProcess::update() // Perform the store, and time the operation. metrics.state_store.start(); - state->store(variable.get().mutate(registry)) + + // Serialize updated registry. + Try<string> serialized = ::protobuf::serialize(*updatedRegistry); + if (serialized.isError()) { + string message = "Failed to update registry: " + serialized.error(); + fail(&operations, message); + abort(message); + return; + } + + state->store(variable->mutate(serialized.get())) .after(flags.registry_store_timeout, lambda::bind( - &timeout<Option<Variable<Registry>>>, + &timeout<Option<Variable>>, "store", flags.registry_store_timeout, lambda::_1)) - .onAny(defer(self(), &Self::_update, lambda::_1, operations)); + .onAny(defer( + self(), &Self::_update, lambda::_1, updatedRegistry, operations)); // Clear the operations, _update will transition the Promises! operations.clear(); @@ -480,7 +521,8 @@ void RegistrarProcess::update() void RegistrarProcess::_update( - const Future<Option<Variable<Registry>>>& store, + const Future<Option<Variable>>& store, + const Owned<Registry>& updatedRegistry, deque<Owned<Operation>> applied) { updating = false; @@ -508,6 +550,7 @@ void RegistrarProcess::_update( LOG(INFO) << "Successfully updated the registry in " << elapsed; variable = store.get().get(); + registry->Swap(updatedRegistry.get()); // Remove the operations. while (!applied.empty()) { http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/master/registrar.hpp ---------------------------------------------------------------------- diff --git a/src/master/registrar.hpp b/src/master/registrar.hpp index a70132b..c439f6a 100644 --- a/src/master/registrar.hpp +++ b/src/master/registrar.hpp @@ -19,7 +19,7 @@ #include <mesos/mesos.hpp> -#include <mesos/state/protobuf.hpp> +#include <mesos/state/state.hpp> #include <process/future.hpp> #include <process/owned.hpp> @@ -92,7 +92,7 @@ class Registrar { public: Registrar(const Flags& flags, - mesos::state::protobuf::State* state, + mesos::state::State* state, const Option<std::string>& authenticationRealm = None()); virtual ~Registrar(); http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/cluster.cpp ---------------------------------------------------------------------- diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp index 02590a2..a4f57e0 100644 --- a/src/tests/cluster.cpp +++ b/src/tests/cluster.cpp @@ -254,7 +254,7 @@ Try<process::Owned<Master>> Master::start( } // Instantiate some other master dependencies. - master->state.reset(new mesos::state::protobuf::State(master->storage.get())); + master->state.reset(new mesos::state::State(master->storage.get())); master->registrar.reset(new MockRegistrar( flags, master->state.get(), master::READONLY_HTTP_AUTHENTICATION_REALM)); http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/cluster.hpp ---------------------------------------------------------------------- diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp index 250b12f..6563412 100644 --- a/src/tests/cluster.hpp +++ b/src/tests/cluster.hpp @@ -34,7 +34,7 @@ #include <mesos/state/in_memory.hpp> #include <mesos/state/log.hpp> -#include <mesos/state/protobuf.hpp> +#include <mesos/state/state.hpp> #include <mesos/state/storage.hpp> #include <mesos/zookeeper/url.hpp> @@ -123,7 +123,7 @@ private: process::Owned<mesos::master::detector::MasterDetector> detector; process::Owned<mesos::log::Log> log; process::Owned<mesos::state::Storage> storage; - process::Owned<mesos::state::protobuf::State> state; + process::Owned<mesos::state::State> state; public: // Exposed for testing and mocking purposes. We always use a // `MockRegistrar` in case the test case wants to inspect how the http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/mock_registrar.cpp ---------------------------------------------------------------------- diff --git a/src/tests/mock_registrar.cpp b/src/tests/mock_registrar.cpp index 8643e4c..0a877b2 100644 --- a/src/tests/mock_registrar.cpp +++ b/src/tests/mock_registrar.cpp @@ -18,7 +18,7 @@ #include <gmock/gmock.h> -#include <mesos/state/protobuf.hpp> +#include <mesos/state/state.hpp> #include <process/future.hpp> #include <process/owned.hpp> @@ -46,7 +46,7 @@ namespace tests { MockRegistrar::MockRegistrar( const master::Flags& flags, - mesos::state::protobuf::State* state, + mesos::state::State* state, const Option<string>& authenticationRealm) : Registrar(flags, state, authenticationRealm) { http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/mock_registrar.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mock_registrar.hpp b/src/tests/mock_registrar.hpp index cdcc699..92c3994 100644 --- a/src/tests/mock_registrar.hpp +++ b/src/tests/mock_registrar.hpp @@ -21,7 +21,7 @@ #include <gmock/gmock.h> -#include <mesos/state/protobuf.hpp> +#include <mesos/state/state.hpp> #include <process/future.hpp> #include <process/owned.hpp> @@ -40,7 +40,7 @@ class MockRegistrar : public mesos::internal::master::Registrar { public: MockRegistrar(const master::Flags& flags, - mesos::state::protobuf::State* state, + mesos::state::State* state, const Option<std::string>& authenticationRealm = None()); virtual ~MockRegistrar(); http://git-wip-us.apache.org/repos/asf/mesos/blob/3b70d417/src/tests/registrar_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp index 5c6fb56..e2c38d3 100644 --- a/src/tests/registrar_tests.cpp +++ b/src/tests/registrar_tests.cpp @@ -29,7 +29,7 @@ #include <mesos/log/log.hpp> #include <mesos/state/log.hpp> -#include <mesos/state/protobuf.hpp> +#include <mesos/state/state.hpp> #include <mesos/state/storage.hpp> #include <process/clock.hpp> @@ -113,8 +113,8 @@ using namespace mesos::internal::master::weights; using mesos::http::authentication::BasicAuthenticatorFactory; using mesos::state::LogStorage; +using mesos::state::State; using mesos::state::Storage; -using mesos::state::protobuf::State; using state::Entry;
