This is an automated email from the ASF dual-hosted git repository. huijun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new 1fe99ed Integrate runtime config and rate limit (#2846) 1fe99ed is described below commit 1fe99edbe926b1a620ad32f5c5ec30eaa8cd8f7d Author: Ning Wang <nw...@twitter.com> AuthorDate: Thu Apr 12 14:57:12 2018 -0700 Integrate runtime config and rate limit (#2846) * Integrate runtime config and rate limit * more clean up * Fix integration test by avoiding changing hydrated topology in stmgr --- .../src/cpp/config/topology-config-helper.cpp | 43 ++++++++++++++--- .../common/src/cpp/config/topology-config-helper.h | 15 ++++++ .../cpp/config/topology-config-helper_unittest.cpp | 42 +++++++++++++++- heron/stmgr/src/cpp/manager/instance-server.cpp | 24 ++++++---- heron/stmgr/src/cpp/manager/stmgr.cpp | 42 ++++++++++++++-- heron/stmgr/src/cpp/manager/stmgr.h | 6 +++ heron/stmgr/tests/cpp/server/stmgr_unittest.cpp | 56 ++++++++++++++++++++++ heron/tmaster/src/cpp/manager/tcontroller.cpp | 4 +- heron/tmaster/src/cpp/manager/tmaster.cpp | 23 +++++---- heron/tmaster/src/cpp/manager/tmaster.h | 10 ++-- .../tmaster/tests/cpp/server/tmaster_unittest.cpp | 3 +- 11 files changed, 228 insertions(+), 40 deletions(-) diff --git a/heron/common/src/cpp/config/topology-config-helper.cpp b/heron/common/src/cpp/config/topology-config-helper.cpp index e84a7aa..3200330 100644 --- a/heron/common/src/cpp/config/topology-config-helper.cpp +++ b/heron/common/src/cpp/config/topology-config-helper.cpp @@ -29,6 +29,9 @@ namespace heron { namespace config { +static const char TOPOLOGY_CONFIG_KEY[] = "_topology_"; +static const char RUNTIME_CONFIG_POSTFIX[] = ":runtime"; + TopologyConfigVars::TopologyReliabilityMode StringToReliabilityMode(const std::string& _mode) { if (_mode == "ATMOST_ONCE") { return TopologyConfigVars::TopologyReliabilityMode::ATMOST_ONCE; @@ -411,6 +414,20 @@ bool TopologyConfigHelper::DropTuplesUponBackpressure(const proto::api::Topology TopologyConfigVars::TOPOLOGY_DROPTUPLES_UPON_BACKPRESSURE, false); } +std::string TopologyConfigHelper::GetRuntimeConfigKey(const std::string& _key) { + return _key + RUNTIME_CONFIG_POSTFIX; +} + +// Convert configs in map to runtime configs (append runtime postfix) +void TopologyConfigHelper::ConvertToRuntimeConfigs( + const std::map<std::string, std::string>& _origin, + std::map<std::string, std::string>& _retval) { + std::map<std::string, std::string>::const_iterator it; + for (it = _origin.begin(); it != _origin.end(); ++it) { + _retval[GetRuntimeConfigKey(it->first)] = it->second; + } +} + // Return topology level config void TopologyConfigHelper::GetTopologyConfig(const proto::api::Topology& _topology, std::map<std::string, std::string>& retval) { @@ -514,12 +531,9 @@ bool TopologyConfigHelper::GetBooleanConfigValue(const proto::api::Topology& _to const std::string& _config_name, bool _default_value) { static const std::string value_true_ = "true"; - const proto::api::Config& cfg = _topology.topology_config(); - const std::string value = GetConfigValue(cfg, _config_name, ""); - if (!value.empty()) { - return value_true_.compare(value.c_str()) == 0; - } - return _default_value; + const std::string value = GetTopologyConfigValue(_topology, _config_name, ""); + + return value_true_.compare(value.c_str()) == 0; } // Convert topology config to a key value map @@ -530,6 +544,14 @@ void TopologyConfigHelper::ConvertConfigToKVMap(const proto::api::Config& _confi } } +const std::string TopologyConfigHelper::GetTopologyConfigValue( + const proto::api::Topology& _topology, + const std::string& _key, + const std::string& _default) { + const proto::api::Config& cfg = _topology.topology_config(); + return GetConfigValue(cfg, _key, _default); +} + const std::string TopologyConfigHelper::GetComponentConfigValue( const proto::api::Topology& _topology, const std::string& _component, @@ -552,13 +574,20 @@ const std::string TopologyConfigHelper::GetComponentConfigValue( sp_int64 TopologyConfigHelper::GetComponentOutputBPS(const proto::api::Topology& _topology, const std::string& _component) { - const std::string value = GetComponentConfigValue(_topology, _component, + const std::string init_value = GetComponentConfigValue(_topology, _component, TopologyConfigVars::TOPOLOGY_COMPONENT_OUTPUT_BPS, ""); + const std::string value = GetComponentConfigValue(_topology, _component, + GetRuntimeConfigKey(TopologyConfigVars::TOPOLOGY_COMPONENT_OUTPUT_BPS), init_value); if (!value.empty()) { return atol(value.c_str()); } + return -1; // default to -1 (no rate limit) } + +const char* TopologyConfigHelper::GetReservedTopologyConfigKey() { + return TOPOLOGY_CONFIG_KEY; +} } // namespace config } // namespace heron diff --git a/heron/common/src/cpp/config/topology-config-helper.h b/heron/common/src/cpp/config/topology-config-helper.h index a8dd41b..b4b8651 100644 --- a/heron/common/src/cpp/config/topology-config-helper.h +++ b/heron/common/src/cpp/config/topology-config-helper.h @@ -136,6 +136,13 @@ class TopologyConfigHelper { // Do we want to drop tuples upon backpressure detection static bool DropTuplesUponBackpressure(const proto::api::Topology& _topology); + // Get runtime config key + static std::string GetRuntimeConfigKey(const std::string& key); + + // Convert configs in map to runtime configs (append runtime postfix) + static void ConvertToRuntimeConfigs(const std::map<std::string, std::string>& _origin, + std::map<std::string, std::string>& _retval); + // Return topology level config static void GetTopologyConfig(const proto::api::Topology& _topology, std::map<std::string, std::string>& retval); @@ -155,6 +162,11 @@ class TopologyConfigHelper { const std::string& _component_name, const std::map<std::string, std::string>& config); + // Get the topology config value given the config key + static const std::string GetTopologyConfigValue(const proto::api::Topology& _topology, + const std::string& _key, + const std::string& _default); + // Get the config value given component name and config key static const std::string GetComponentConfigValue(const proto::api::Topology& _topology, const std::string& _component, @@ -170,6 +182,9 @@ class TopologyConfigHelper { static sp_int64 GetComponentOutputBPS(const proto::api::Topology& _topology, const std::string& _component); + // Get reserved topology config key. + static const char* GetReservedTopologyConfigKey(); + private: static bool GetBooleanConfigValue(const proto::api::Topology& _topology, const std::string& _config_name, diff --git a/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp b/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp index f99a3d5..698c772 100644 --- a/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp +++ b/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp @@ -134,13 +134,23 @@ TEST(TopologyConfigHelper, GetAndSetTopologyConfig) { "test_topology", "123", 3, NUM_SPOUT_INSTANCES, 3, NUM_BOLT_INSTANCES, heron::proto::api::SHUFFLE); - // Test init config + // Test initial config std::map<std::string, std::string> old_config; heron::config::TopologyConfigHelper::GetTopologyConfig(*test_topology, old_config); EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS], MESSAGE_TIMEOUT); EXPECT_EQ(old_config[TOPOLOGY_USER_CONFIG], TOPOLOGY_USER_CONFIG_VALUE); + // Test GetComponentConfigValue function + EXPECT_EQ( + heron::config::TopologyConfigHelper::GetTopologyConfigValue( + *test_topology, TOPOLOGY_USER_CONFIG, ""), + TOPOLOGY_USER_CONFIG_VALUE); + EXPECT_EQ( + heron::config::TopologyConfigHelper::GetTopologyConfigValue( + *test_topology, TOPOLOGY_USER_CONFIG + ".bad", ""), + ""); + // Set and then test updated config std::string runtime_user_config_key = TOPOLOGY_USER_CONFIG + ":runtime"; std::map<std::string, std::string> update; @@ -171,7 +181,7 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) { std::string non_test_spout = "test_spout2"; std::string test_bolt = "test_bolt2"; std::string non_test_bolt = "test_bolt1"; - // Test init config + // Test initial config std::map<std::string, std::string> old_config; heron::config::TopologyConfigHelper::GetComponentConfig(*test_topology, test_spout, old_config); EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM], @@ -183,6 +193,16 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) { std::to_string(NUM_BOLT_INSTANCES)); EXPECT_EQ(old_config[BOLT_USER_CONFIG], BOLT_USER_CONFIG_VALUE); + // Test GetComponentConfigValue function + EXPECT_EQ( + heron::config::TopologyConfigHelper::GetComponentConfigValue( + *test_topology, test_spout, SPOUT_USER_CONFIG, ""), + SPOUT_USER_CONFIG_VALUE); + EXPECT_EQ( + heron::config::TopologyConfigHelper::GetComponentConfigValue( + *test_topology, test_spout, SPOUT_USER_CONFIG + ".bad", ""), + ""); + // Set user configs to new values std::string runtime_spout_user_config_key = SPOUT_USER_CONFIG + ":runtime"; std::string runtime_bolt_user_config_key = BOLT_USER_CONFIG + ":runtime"; @@ -235,6 +255,24 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) { EXPECT_EQ(updated_config[runtime_bolt_user_config_key], NEW_BOLT_USER_CONFIG_VALUE_2); } +TEST(TopologyConfigHelper, GetRuntimeConfigKey) { + EXPECT_EQ( + heron::config::TopologyConfigHelper::GetRuntimeConfigKey("conf.test1"), + "conf.test1:runtime"); +} + +TEST(TopologyConfigHelper, ConvertToRuntimeConfigs) { + std::map<std::string, std::string> original_config; + original_config["conf.test1"] = "a"; + original_config["conf.test2"] = "b"; + + std::map<std::string, std::string> runtime_config; + heron::config::TopologyConfigHelper::ConvertToRuntimeConfigs(original_config, runtime_config); + + EXPECT_EQ(runtime_config["conf.test1:runtime"], "a"); + EXPECT_EQ(runtime_config["conf.test2:runtime"], "b"); +} + int main(int argc, char **argv) { heron::common::Initialize(argv[0]); testing::InitGoogleTest(&argc, argv); diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp index 7288749..30e787e 100644 --- a/heron/stmgr/src/cpp/manager/instance-server.cpp +++ b/heron/stmgr/src/cpp/manager/instance-server.cpp @@ -15,9 +15,11 @@ */ #include "manager/instance-server.h" + #include <iostream> -#include <unordered_set> +#include <map> #include <string> +#include <unordered_set> #include <vector> #include "manager/checkpoint-gateway.h" #include "util/neighbour-calculator.h" @@ -422,6 +424,9 @@ void InstanceServer::DrainCheckpoint(sp_int32 _task_id, void InstanceServer::BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan& _pplan) { // TODO(vikasr) We do not handle any changes to our local assignment + LOG(INFO) << "Broadcasting new PhysicalPlan:"; + config::TopologyConfigHelper::LogTopology(_pplan.topology()); + ComputeLocalSpouts(_pplan); proto::stmgr::NewInstanceAssignmentMessage new_assignment; new_assignment.mutable_pplan()->CopyFrom(_pplan); @@ -439,17 +444,18 @@ void InstanceServer::BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan& void InstanceServer::SetRateLimit(const proto::system::PhysicalPlan& _pplan, const std::string& _component, Connection* _conn) const { - sp_int64 read_bsp = + sp_int64 read_bps = config::TopologyConfigHelper::GetComponentOutputBPS(_pplan.topology(), _component); sp_int32 parallelism = config::TopologyConfigHelper::GetComponentParallelism(_pplan.topology(), _component); - sp_int64 burst_read_bsp = read_bsp + read_bsp / 2; - - // There should be parallelism hint and the per instance rate limit should be at least - // one byte per second - if (parallelism > 0 && read_bsp > parallelism && burst_read_bsp > parallelism) { - LOG(INFO) << "Set rate limit in " << _component << " to " << read_bsp << "/" << burst_read_bsp; - _conn->setRateLimit(read_bsp / parallelism, burst_read_bsp / parallelism); + // burst rate is 1.5 x of regular rate + sp_int64 burst_read_bps = read_bps + read_bps / 2; + + LOG(INFO) << "Parallelism of component " << _component << " is " << parallelism; + LOG(INFO) << "Read BPS of component " << _component << " is " << read_bps; + if (parallelism > 0 && read_bps >= 0 && burst_read_bps >= 0) { + LOG(INFO) << "Set rate limit in " << _component << " to " << read_bps << "/" << burst_read_bps; + _conn->setRateLimit(read_bps / parallelism, burst_read_bps / parallelism); } else { LOG(INFO) << "Disable rate limit in " << _component; _conn->disableRateLimit(); diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp index 3fd805b..738fa03 100644 --- a/heron/stmgr/src/cpp/manager/stmgr.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr.cpp @@ -538,6 +538,7 @@ void StMgr::StartTMasterClient() { void StMgr::NewPhysicalPlan(proto::system::PhysicalPlan* _pplan) { LOG(INFO) << "Received a new physical plan from tmaster"; + heron::config::TopologyConfigHelper::LogTopology(_pplan->topology()); // first make sure that we are part of the plan ;) bool found = false; for (sp_int32 i = 0; i < _pplan->stmgrs_size(); ++i) { @@ -561,10 +562,10 @@ void StMgr::NewPhysicalPlan(proto::system::PhysicalPlan* _pplan) { LOG(INFO) << "Topology state changed from " << pplan_->topology().state() << " to " << _pplan->topology().state(); } - proto::api::TopologyState st = _pplan->topology().state(); - _pplan->clear_topology(); - _pplan->mutable_topology()->CopyFrom(*hydrated_topology_); - _pplan->mutable_topology()->set_state(st); + + PatchPhysicalPlanWithHydratedTopology(_pplan, hydrated_topology_); + LOG(INFO) << "Patched with hydrated topology"; + heron::config::TopologyConfigHelper::LogTopology(_pplan->topology()); // TODO(vikasr) Currently we dont check if our role has changed @@ -1115,5 +1116,38 @@ void StMgr::HandleStatefulRestoreDone(proto::system::StatusCode _status, std::string _checkpoint_id, sp_int64 _restore_txid) { tmaster_client_->SendRestoreTopologyStateResponse(_status, _checkpoint_id, _restore_txid); } + +// Patch new physical plan with internal hydrated topology but keep new topology data: +// - new topology state +// - new topology/component config +void StMgr::PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan* _pplan, + proto::api::Topology* _topology) { + // Back up new topology data (state and configs) + proto::api::TopologyState st = _pplan->topology().state(); + + std::map<std::string, std::string> topology_config; + config::TopologyConfigHelper::GetTopologyConfig(_pplan->topology(), topology_config); + + std::unordered_set<std::string> components; + std::map<std::string, std::map<std::string, std::string>> component_config; + config::TopologyConfigHelper::GetAllComponentNames(_pplan->topology(), components); + for (auto iter = components.begin(); iter != components.end(); ++iter) { + std::map<std::string, std::string> config; + config::TopologyConfigHelper::GetComponentConfig(_pplan->topology(), *iter, topology_config); + component_config[*iter] = config; + } + + // Copy hydrated topology into pplan + _pplan->clear_topology(); + _pplan->mutable_topology()->CopyFrom(*_topology); + + // Restore new topology data + _pplan->mutable_topology()->set_state(st); + config::TopologyConfigHelper::SetTopologyConfig(_pplan->mutable_topology(), topology_config); + for (auto iter = components.begin(); iter != components.end(); ++iter) { + config::TopologyConfigHelper::SetComponentConfig(_pplan->mutable_topology(), *iter, + component_config[*iter]); + } +} } // namespace stmgr } // namespace heron diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h index cd0d0da..48e30be 100644 --- a/heron/stmgr/src/cpp/manager/stmgr.h +++ b/heron/stmgr/src/cpp/manager/stmgr.h @@ -187,6 +187,12 @@ class StMgr { void HandleStatefulRestoreDone(proto::system::StatusCode _status, std::string _checkpoint_id, sp_int64 _restore_txid); + // Patch new physical plan with internal hydrated topology but keep new topology data: + // - new topology state + // - new topology/component config + static void PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan* _pplan, + proto::api::Topology* _topology); + heron::common::HeronStateMgr* state_mgr_; proto::system::PhysicalPlan* pplan_; sp_string topology_name_; diff --git a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp index 8670304..41363d9 100644 --- a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp +++ b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp @@ -1867,6 +1867,62 @@ TEST(StMgr, test_metricsmgr_reconnect) { TearCommonResources(common); } +// Test PatchPhysicalPlanWithHydratedTopology function +TEST(StMgr, test_PatchPhysicalPlanWithHydratedTopology) { + int32_t nSpouts = 2; + int32_t nSpoutInstances = 1; + int32_t nBolts = 3; + int32_t nBoltInstances = 1; + heron::proto::api::Topology* topology = + GenerateDummyTopology("topology_name", + "topology_id", + nSpouts, nSpoutInstances, nBolts, nBoltInstances, + heron::proto::api::SHUFFLE); + + heron::proto::system::PhysicalPlan* pplan = new heron::proto::system::PhysicalPlan(); + pplan->mutable_topology()->CopyFrom(*topology); + + // Verify initial values + EXPECT_EQ( + heron::config::TopologyConfigHelper::GetTopologyConfigValue( + *topology, + heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS, + ""), + "30"); + EXPECT_EQ( + heron::config::TopologyConfigHelper::GetTopologyConfigValue( + pplan->topology(), + heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS, + ""), + "30"); + // Change runtime data in PhysicalPlan and patch it + std::map<std::string, std::string> update; + update["conf.new"] = "test"; + update[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS] = "10"; + heron::config::TopologyConfigHelper::SetTopologyConfig(pplan->mutable_topology(), update); + + // Verify updated runtime data is still in the patched physical plan + // The topology in the physical plan should have the old name + EXPECT_EQ( + heron::config::TopologyConfigHelper::GetTopologyConfigValue( + *topology, + heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS, + ""), + "30"); // The internal topology object should still have the initial value + EXPECT_EQ( + heron::config::TopologyConfigHelper::GetTopologyConfigValue( + pplan->topology(), + heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS, + ""), + "10"); // The topology object in the physical plan should have the new value + EXPECT_EQ( + heron::config::TopologyConfigHelper::GetTopologyConfigValue( + pplan->topology(), "conf.new", ""), + "test"); // The topology object in the physical plan should have the new config + + delete pplan; +} + int main(int argc, char** argv) { heron::common::Initialize(argv[0]); std::cout << "Current working directory (to find stmgr logs) " diff --git a/heron/tmaster/src/cpp/manager/tcontroller.cpp b/heron/tmaster/src/cpp/manager/tcontroller.cpp index a13784e..d5f933c 100644 --- a/heron/tmaster/src/cpp/manager/tcontroller.cpp +++ b/heron/tmaster/src/cpp/manager/tcontroller.cpp @@ -23,6 +23,7 @@ #include <vector> #include "basics/basics.h" #include "basics/strutils.h" +#include "config/topology-config-helper.h" #include "errors/errors.h" #include "manager/tmaster.h" #include "network/network.h" @@ -298,7 +299,8 @@ bool TController::ParseRuntimeConfig(const std::vector<std::string>& paramters, std::vector<std::string> segments = StrUtils::split(*iter, ":"); if (segments.size() == 2) { // Topology level config - retval[TOPOLOGY_CONFIG_KEY][segments[0]] = segments[1]; + const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey(); + retval[topology_key][segments[0]] = segments[1]; } else if (segments.size() == 3) { // Component level config retval[segments[0]][segments[1]] = segments[2]; diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp b/heron/tmaster/src/cpp/manager/tmaster.cpp index ad2affe..c27ccfe 100644 --- a/heron/tmaster/src/cpp/manager/tmaster.cpp +++ b/heron/tmaster/src/cpp/manager/tmaster.cpp @@ -591,6 +591,9 @@ bool TMaster::UpdateRuntimeConfig(const ComponentConfigMap& _config, VCallback<proto::system::StatusCode> cb) { DCHECK(current_pplan_->topology().IsInitialized()); + LOG(INFO) << "Update runtime config: "; + LogConfig(_config); + // Parse and set the new configs proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan(); new_pplan->CopyFrom(*current_pplan_); @@ -631,11 +634,12 @@ bool TMaster::UpdateRuntimeConfigInTopology(proto::api::Topology* _topology, DCHECK(_topology->IsInitialized()); ComponentConfigMap::const_iterator iter; + const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey(); for (iter = _config.begin(); iter != _config.end(); ++iter) { // Get config for topology or component. std::map<std::string, std::string> runtime_config; - AppendPostfix(iter->second, RUNTIME_CONFIG_POSTFIX, runtime_config); - if (iter->first == TOPOLOGY_CONFIG_KEY) { + config::TopologyConfigHelper::ConvertToRuntimeConfigs(iter->second, runtime_config); + if (iter->first == topology_key) { config::TopologyConfigHelper::SetTopologyConfig(_topology, runtime_config); } else { config::TopologyConfigHelper::SetComponentConfig(_topology, iter->first, runtime_config); @@ -1022,8 +1026,9 @@ bool TMaster::ValidateRuntimeConfigNames(const ComponentConfigMap& _config) cons config::TopologyConfigHelper::GetAllComponentNames(topology, components); ComponentConfigMap::const_iterator iter; + const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey(); for (iter = _config.begin(); iter != _config.end(); ++iter) { - if (iter->first != TOPOLOGY_CONFIG_KEY) { + if (iter->first != topology_key) { // It is a component, search for it if (components.find(iter->first) == components.end()) { return false; @@ -1034,12 +1039,12 @@ bool TMaster::ValidateRuntimeConfigNames(const ComponentConfigMap& _config) cons return true; } -void TMaster::AppendPostfix(const ConfigValueMap& _origin, - const std::string& post_fix, - ConfigValueMap& _retval) { - ConfigValueMap::const_iterator it; - for (it = _origin.begin(); it != _origin.end(); ++it) { - _retval[it->first + post_fix] = it->second; +void TMaster::LogConfig(const ComponentConfigMap& _config) { + for (auto iter = _config.begin(); iter != _config.end(); ++iter) { + LOG(INFO) << iter->first << " =>"; + for (auto i = iter->second.begin(); i != iter->second.end(); ++i) { + LOG(INFO) << i->first << " : " << i->second; + } } } diff --git a/heron/tmaster/src/cpp/manager/tmaster.h b/heron/tmaster/src/cpp/manager/tmaster.h index ca2b0d4..defe8fd 100644 --- a/heron/tmaster/src/cpp/manager/tmaster.h +++ b/heron/tmaster/src/cpp/manager/tmaster.h @@ -47,9 +47,6 @@ typedef std::map<std::string, std::string> ConfigValueMap; // From component name to config/value pairs typedef std::map<std::string, std::map<std::string, std::string>> ComponentConfigMap; -const sp_string TOPOLOGY_CONFIG_KEY = "_topology_"; -const sp_string RUNTIME_CONFIG_POSTFIX = ":runtime"; - class TMaster { public: TMaster(const std::string& _zk_hostport, const std::string& _topology_name, @@ -124,6 +121,9 @@ class TMaster { // Function to be called that calls MakePhysicalPlan and sends it to all stmgrs void DoPhysicalPlan(EventLoop::Status _code); + // Log config object + void LogConfig(const ComponentConfigMap& _config); + // Big brother function that does the assignment to the workers // If _new_stmgr is null, this means that there was a plan // existing, but a _new_stmgr joined us. So redo his part @@ -192,10 +192,6 @@ class TMaster { sp_int32 port, const std::string& stmgr_id); - void AppendPostfix(const ConfigValueMap& _origin, - const std::string& post_fix, - ConfigValueMap& _update); - // map of active stmgr id to stmgr state StMgrMap stmgrs_; diff --git a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp index 6db68df..0f37dc8 100644 --- a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp +++ b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp @@ -735,7 +735,8 @@ TEST(StMgr, test_runtime_config) { std::map<std::string, std::string> validate_good_config; validate_good_config[topology_runtime_config_1] = "1"; validate_good_config[topology_runtime_config_2] = "2"; - validate_good_config_map[heron::tmaster::TOPOLOGY_CONFIG_KEY] = validate_good_config; + const char* topology_key = heron::config::TopologyConfigHelper::GetReservedTopologyConfigKey(); + validate_good_config_map[topology_key] = validate_good_config; validate_good_config_map["spout1"] = validate_good_config; EXPECT_EQ(common.tmaster_->ValidateRuntimeConfig(validate_good_config_map), true); -- To stop receiving notification emails like this one, please contact hui...@apache.org.