huijunwu closed pull request #2846: Integrate runtime config and rate limit
URL: https://github.com/apache/incubator-heron/pull/2846
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/heron/common/src/cpp/config/topology-config-helper.cpp
b/heron/common/src/cpp/config/topology-config-helper.cpp
index e84a7aae55..3200330f70 100644
--- a/heron/common/src/cpp/config/topology-config-helper.cpp
+++ b/heron/common/src/cpp/config/topology-config-helper.cpp
@@ -29,6 +29,9 @@
namespace heron {
namespace config {
+static const char TOPOLOGY_CONFIG_KEY[] = "_topology_";
+static const char RUNTIME_CONFIG_POSTFIX[] = ":runtime";
+
TopologyConfigVars::TopologyReliabilityMode StringToReliabilityMode(const
std::string& _mode) {
if (_mode == "ATMOST_ONCE") {
return TopologyConfigVars::TopologyReliabilityMode::ATMOST_ONCE;
@@ -411,6 +414,20 @@ bool
TopologyConfigHelper::DropTuplesUponBackpressure(const proto::api::Topology
TopologyConfigVars::TOPOLOGY_DROPTUPLES_UPON_BACKPRESSURE, false);
}
+std::string TopologyConfigHelper::GetRuntimeConfigKey(const std::string& _key)
{
+ return _key + RUNTIME_CONFIG_POSTFIX;
+}
+
+// Convert configs in map to runtime configs (append runtime postfix)
+void TopologyConfigHelper::ConvertToRuntimeConfigs(
+ const std::map<std::string, std::string>& _origin,
+ std::map<std::string, std::string>& _retval) {
+ std::map<std::string, std::string>::const_iterator it;
+ for (it = _origin.begin(); it != _origin.end(); ++it) {
+ _retval[GetRuntimeConfigKey(it->first)] = it->second;
+ }
+}
+
// Return topology level config
void TopologyConfigHelper::GetTopologyConfig(const proto::api::Topology&
_topology,
std::map<std::string,
std::string>& retval) {
@@ -514,12 +531,9 @@ bool TopologyConfigHelper::GetBooleanConfigValue(const
proto::api::Topology& _to
const std::string&
_config_name,
bool _default_value) {
static const std::string value_true_ = "true";
- const proto::api::Config& cfg = _topology.topology_config();
- const std::string value = GetConfigValue(cfg, _config_name, "");
- if (!value.empty()) {
- return value_true_.compare(value.c_str()) == 0;
- }
- return _default_value;
+ const std::string value = GetTopologyConfigValue(_topology, _config_name,
"");
+
+ return value_true_.compare(value.c_str()) == 0;
}
// Convert topology config to a key value map
@@ -530,6 +544,14 @@ void TopologyConfigHelper::ConvertConfigToKVMap(const
proto::api::Config& _confi
}
}
+const std::string TopologyConfigHelper::GetTopologyConfigValue(
+ const proto::api::Topology& _topology,
+ const std::string& _key,
+ const std::string& _default) {
+ const proto::api::Config& cfg = _topology.topology_config();
+ return GetConfigValue(cfg, _key, _default);
+}
+
const std::string TopologyConfigHelper::GetComponentConfigValue(
const proto::api::Topology& _topology,
const std::string& _component,
@@ -552,13 +574,20 @@ const std::string
TopologyConfigHelper::GetComponentConfigValue(
sp_int64 TopologyConfigHelper::GetComponentOutputBPS(const
proto::api::Topology& _topology,
const std::string&
_component) {
- const std::string value = GetComponentConfigValue(_topology, _component,
+ const std::string init_value = GetComponentConfigValue(_topology, _component,
TopologyConfigVars::TOPOLOGY_COMPONENT_OUTPUT_BPS, "");
+ const std::string value = GetComponentConfigValue(_topology, _component,
+ GetRuntimeConfigKey(TopologyConfigVars::TOPOLOGY_COMPONENT_OUTPUT_BPS),
init_value);
if (!value.empty()) {
return atol(value.c_str());
}
+
return -1; // default to -1 (no rate limit)
}
+
+const char* TopologyConfigHelper::GetReservedTopologyConfigKey() {
+ return TOPOLOGY_CONFIG_KEY;
+}
} // namespace config
} // namespace heron
diff --git a/heron/common/src/cpp/config/topology-config-helper.h
b/heron/common/src/cpp/config/topology-config-helper.h
index a8dd41b3f0..b4b8651473 100644
--- a/heron/common/src/cpp/config/topology-config-helper.h
+++ b/heron/common/src/cpp/config/topology-config-helper.h
@@ -136,6 +136,13 @@ class TopologyConfigHelper {
// Do we want to drop tuples upon backpressure detection
static bool DropTuplesUponBackpressure(const proto::api::Topology&
_topology);
+ // Get runtime config key
+ static std::string GetRuntimeConfigKey(const std::string& key);
+
+ // Convert configs in map to runtime configs (append runtime postfix)
+ static void ConvertToRuntimeConfigs(const std::map<std::string,
std::string>& _origin,
+ std::map<std::string, std::string>&
_retval);
+
// Return topology level config
static void GetTopologyConfig(const proto::api::Topology& _topology,
std::map<std::string, std::string>& retval);
@@ -155,6 +162,11 @@ class TopologyConfigHelper {
const std::string& _component_name,
const std::map<std::string, std::string>&
config);
+ // Get the topology config value given the config key
+ static const std::string GetTopologyConfigValue(const proto::api::Topology&
_topology,
+ const std::string& _key,
+ const std::string& _default);
+
// Get the config value given component name and config key
static const std::string GetComponentConfigValue(const proto::api::Topology&
_topology,
const std::string&
_component,
@@ -170,6 +182,9 @@ class TopologyConfigHelper {
static sp_int64 GetComponentOutputBPS(const proto::api::Topology& _topology,
const std::string& _component);
+ // Get reserved topology config key.
+ static const char* GetReservedTopologyConfigKey();
+
private:
static bool GetBooleanConfigValue(const proto::api::Topology& _topology,
const std::string& _config_name,
diff --git a/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
b/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
index f99a3d5c04..698c772887 100644
--- a/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
+++ b/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
@@ -134,13 +134,23 @@ TEST(TopologyConfigHelper, GetAndSetTopologyConfig) {
"test_topology", "123", 3, NUM_SPOUT_INSTANCES, 3, NUM_BOLT_INSTANCES,
heron::proto::api::SHUFFLE);
- // Test init config
+ // Test initial config
std::map<std::string, std::string> old_config;
heron::config::TopologyConfigHelper::GetTopologyConfig(*test_topology,
old_config);
EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS],
MESSAGE_TIMEOUT);
EXPECT_EQ(old_config[TOPOLOGY_USER_CONFIG], TOPOLOGY_USER_CONFIG_VALUE);
+ // Test GetComponentConfigValue function
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ *test_topology, TOPOLOGY_USER_CONFIG, ""),
+ TOPOLOGY_USER_CONFIG_VALUE);
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ *test_topology, TOPOLOGY_USER_CONFIG + ".bad", ""),
+ "");
+
// Set and then test updated config
std::string runtime_user_config_key = TOPOLOGY_USER_CONFIG + ":runtime";
std::map<std::string, std::string> update;
@@ -171,7 +181,7 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
std::string non_test_spout = "test_spout2";
std::string test_bolt = "test_bolt2";
std::string non_test_bolt = "test_bolt1";
- // Test init config
+ // Test initial config
std::map<std::string, std::string> old_config;
heron::config::TopologyConfigHelper::GetComponentConfig(*test_topology,
test_spout, old_config);
EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM],
@@ -183,6 +193,16 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
std::to_string(NUM_BOLT_INSTANCES));
EXPECT_EQ(old_config[BOLT_USER_CONFIG], BOLT_USER_CONFIG_VALUE);
+ // Test GetComponentConfigValue function
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetComponentConfigValue(
+ *test_topology, test_spout, SPOUT_USER_CONFIG, ""),
+ SPOUT_USER_CONFIG_VALUE);
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetComponentConfigValue(
+ *test_topology, test_spout, SPOUT_USER_CONFIG + ".bad", ""),
+ "");
+
// Set user configs to new values
std::string runtime_spout_user_config_key = SPOUT_USER_CONFIG + ":runtime";
std::string runtime_bolt_user_config_key = BOLT_USER_CONFIG + ":runtime";
@@ -235,6 +255,24 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
EXPECT_EQ(updated_config[runtime_bolt_user_config_key],
NEW_BOLT_USER_CONFIG_VALUE_2);
}
+TEST(TopologyConfigHelper, GetRuntimeConfigKey) {
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetRuntimeConfigKey("conf.test1"),
+ "conf.test1:runtime");
+}
+
+TEST(TopologyConfigHelper, ConvertToRuntimeConfigs) {
+ std::map<std::string, std::string> original_config;
+ original_config["conf.test1"] = "a";
+ original_config["conf.test2"] = "b";
+
+ std::map<std::string, std::string> runtime_config;
+
heron::config::TopologyConfigHelper::ConvertToRuntimeConfigs(original_config,
runtime_config);
+
+ EXPECT_EQ(runtime_config["conf.test1:runtime"], "a");
+ EXPECT_EQ(runtime_config["conf.test2:runtime"], "b");
+}
+
int main(int argc, char **argv) {
heron::common::Initialize(argv[0]);
testing::InitGoogleTest(&argc, argv);
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp
b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 72887496dd..30e787e060 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -15,9 +15,11 @@
*/
#include "manager/instance-server.h"
+
#include <iostream>
-#include <unordered_set>
+#include <map>
#include <string>
+#include <unordered_set>
#include <vector>
#include "manager/checkpoint-gateway.h"
#include "util/neighbour-calculator.h"
@@ -422,6 +424,9 @@ void InstanceServer::DrainCheckpoint(sp_int32 _task_id,
void InstanceServer::BroadcastNewPhysicalPlan(const
proto::system::PhysicalPlan& _pplan) {
// TODO(vikasr) We do not handle any changes to our local assignment
+ LOG(INFO) << "Broadcasting new PhysicalPlan:";
+ config::TopologyConfigHelper::LogTopology(_pplan.topology());
+
ComputeLocalSpouts(_pplan);
proto::stmgr::NewInstanceAssignmentMessage new_assignment;
new_assignment.mutable_pplan()->CopyFrom(_pplan);
@@ -439,17 +444,18 @@ void InstanceServer::BroadcastNewPhysicalPlan(const
proto::system::PhysicalPlan&
void InstanceServer::SetRateLimit(const proto::system::PhysicalPlan& _pplan,
const std::string& _component,
Connection* _conn) const {
- sp_int64 read_bsp =
+ sp_int64 read_bps =
config::TopologyConfigHelper::GetComponentOutputBPS(_pplan.topology(),
_component);
sp_int32 parallelism =
config::TopologyConfigHelper::GetComponentParallelism(_pplan.topology(),
_component);
- sp_int64 burst_read_bsp = read_bsp + read_bsp / 2;
-
- // There should be parallelism hint and the per instance rate limit should
be at least
- // one byte per second
- if (parallelism > 0 && read_bsp > parallelism && burst_read_bsp >
parallelism) {
- LOG(INFO) << "Set rate limit in " << _component << " to " << read_bsp <<
"/" << burst_read_bsp;
- _conn->setRateLimit(read_bsp / parallelism, burst_read_bsp / parallelism);
+ // burst rate is 1.5 x of regular rate
+ sp_int64 burst_read_bps = read_bps + read_bps / 2;
+
+ LOG(INFO) << "Parallelism of component " << _component << " is " <<
parallelism;
+ LOG(INFO) << "Read BPS of component " << _component << " is " << read_bps;
+ if (parallelism > 0 && read_bps >= 0 && burst_read_bps >= 0) {
+ LOG(INFO) << "Set rate limit in " << _component << " to " << read_bps <<
"/" << burst_read_bps;
+ _conn->setRateLimit(read_bps / parallelism, burst_read_bps / parallelism);
} else {
LOG(INFO) << "Disable rate limit in " << _component;
_conn->disableRateLimit();
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp
b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 3fd805b299..738fa037d6 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -538,6 +538,7 @@ void StMgr::StartTMasterClient() {
void StMgr::NewPhysicalPlan(proto::system::PhysicalPlan* _pplan) {
LOG(INFO) << "Received a new physical plan from tmaster";
+ heron::config::TopologyConfigHelper::LogTopology(_pplan->topology());
// first make sure that we are part of the plan ;)
bool found = false;
for (sp_int32 i = 0; i < _pplan->stmgrs_size(); ++i) {
@@ -561,10 +562,10 @@ void StMgr::NewPhysicalPlan(proto::system::PhysicalPlan*
_pplan) {
LOG(INFO) << "Topology state changed from " << pplan_->topology().state()
<< " to "
<< _pplan->topology().state();
}
- proto::api::TopologyState st = _pplan->topology().state();
- _pplan->clear_topology();
- _pplan->mutable_topology()->CopyFrom(*hydrated_topology_);
- _pplan->mutable_topology()->set_state(st);
+
+ PatchPhysicalPlanWithHydratedTopology(_pplan, hydrated_topology_);
+ LOG(INFO) << "Patched with hydrated topology";
+ heron::config::TopologyConfigHelper::LogTopology(_pplan->topology());
// TODO(vikasr) Currently we dont check if our role has changed
@@ -1115,5 +1116,38 @@ void
StMgr::HandleStatefulRestoreDone(proto::system::StatusCode _status,
std::string _checkpoint_id, sp_int64
_restore_txid) {
tmaster_client_->SendRestoreTopologyStateResponse(_status, _checkpoint_id,
_restore_txid);
}
+
+// Patch new physical plan with internal hydrated topology but keep new
topology data:
+// - new topology state
+// - new topology/component config
+void StMgr::PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan*
_pplan,
+ proto::api::Topology*
_topology) {
+ // Back up new topology data (state and configs)
+ proto::api::TopologyState st = _pplan->topology().state();
+
+ std::map<std::string, std::string> topology_config;
+ config::TopologyConfigHelper::GetTopologyConfig(_pplan->topology(),
topology_config);
+
+ std::unordered_set<std::string> components;
+ std::map<std::string, std::map<std::string, std::string>> component_config;
+ config::TopologyConfigHelper::GetAllComponentNames(_pplan->topology(),
components);
+ for (auto iter = components.begin(); iter != components.end(); ++iter) {
+ std::map<std::string, std::string> config;
+ config::TopologyConfigHelper::GetComponentConfig(_pplan->topology(),
*iter, topology_config);
+ component_config[*iter] = config;
+ }
+
+ // Copy hydrated topology into pplan
+ _pplan->clear_topology();
+ _pplan->mutable_topology()->CopyFrom(*_topology);
+
+ // Restore new topology data
+ _pplan->mutable_topology()->set_state(st);
+ config::TopologyConfigHelper::SetTopologyConfig(_pplan->mutable_topology(),
topology_config);
+ for (auto iter = components.begin(); iter != components.end(); ++iter) {
+
config::TopologyConfigHelper::SetComponentConfig(_pplan->mutable_topology(),
*iter,
+ component_config[*iter]);
+ }
+}
} // namespace stmgr
} // namespace heron
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h
b/heron/stmgr/src/cpp/manager/stmgr.h
index cd0d0da218..48e30be9fe 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -187,6 +187,12 @@ class StMgr {
void HandleStatefulRestoreDone(proto::system::StatusCode _status,
std::string _checkpoint_id, sp_int64
_restore_txid);
+ // Patch new physical plan with internal hydrated topology but keep new
topology data:
+ // - new topology state
+ // - new topology/component config
+ static void
PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan* _pplan,
+ proto::api::Topology*
_topology);
+
heron::common::HeronStateMgr* state_mgr_;
proto::system::PhysicalPlan* pplan_;
sp_string topology_name_;
diff --git a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
index 86703047d0..41363d91ce 100644
--- a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
@@ -1867,6 +1867,62 @@ TEST(StMgr, test_metricsmgr_reconnect) {
TearCommonResources(common);
}
+// Test PatchPhysicalPlanWithHydratedTopology function
+TEST(StMgr, test_PatchPhysicalPlanWithHydratedTopology) {
+ int32_t nSpouts = 2;
+ int32_t nSpoutInstances = 1;
+ int32_t nBolts = 3;
+ int32_t nBoltInstances = 1;
+ heron::proto::api::Topology* topology =
+ GenerateDummyTopology("topology_name",
+ "topology_id",
+ nSpouts, nSpoutInstances, nBolts, nBoltInstances,
+ heron::proto::api::SHUFFLE);
+
+ heron::proto::system::PhysicalPlan* pplan = new
heron::proto::system::PhysicalPlan();
+ pplan->mutable_topology()->CopyFrom(*topology);
+
+ // Verify initial values
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ *topology,
+ heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+ ""),
+ "30");
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ pplan->topology(),
+ heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+ ""),
+ "30");
+ // Change runtime data in PhysicalPlan and patch it
+ std::map<std::string, std::string> update;
+ update["conf.new"] = "test";
+ update[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS] =
"10";
+
heron::config::TopologyConfigHelper::SetTopologyConfig(pplan->mutable_topology(),
update);
+
+ // Verify updated runtime data is still in the patched physical plan
+ // The topology in the physical plan should have the old name
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ *topology,
+ heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+ ""),
+ "30"); // The internal topology object should still have the initial value
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ pplan->topology(),
+ heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+ ""),
+ "10"); // The topology object in the physical plan should have the new
value
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ pplan->topology(), "conf.new", ""),
+ "test"); // The topology object in the physical plan should have the new
config
+
+ delete pplan;
+}
+
int main(int argc, char** argv) {
heron::common::Initialize(argv[0]);
std::cout << "Current working directory (to find stmgr logs) "
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.cpp
b/heron/tmaster/src/cpp/manager/tcontroller.cpp
index a13784ed0a..d5f933c2b7 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.cpp
+++ b/heron/tmaster/src/cpp/manager/tcontroller.cpp
@@ -23,6 +23,7 @@
#include <vector>
#include "basics/basics.h"
#include "basics/strutils.h"
+#include "config/topology-config-helper.h"
#include "errors/errors.h"
#include "manager/tmaster.h"
#include "network/network.h"
@@ -298,7 +299,8 @@ bool TController::ParseRuntimeConfig(const
std::vector<std::string>& paramters,
std::vector<std::string> segments = StrUtils::split(*iter, ":");
if (segments.size() == 2) {
// Topology level config
- retval[TOPOLOGY_CONFIG_KEY][segments[0]] = segments[1];
+ const char* topology_key =
config::TopologyConfigHelper::GetReservedTopologyConfigKey();
+ retval[topology_key][segments[0]] = segments[1];
} else if (segments.size() == 3) {
// Component level config
retval[segments[0]][segments[1]] = segments[2];
diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp
b/heron/tmaster/src/cpp/manager/tmaster.cpp
index ad2affe6cd..c27ccfea4b 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.cpp
+++ b/heron/tmaster/src/cpp/manager/tmaster.cpp
@@ -591,6 +591,9 @@ bool TMaster::UpdateRuntimeConfig(const ComponentConfigMap&
_config,
VCallback<proto::system::StatusCode> cb) {
DCHECK(current_pplan_->topology().IsInitialized());
+ LOG(INFO) << "Update runtime config: ";
+ LogConfig(_config);
+
// Parse and set the new configs
proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
new_pplan->CopyFrom(*current_pplan_);
@@ -631,11 +634,12 @@ bool
TMaster::UpdateRuntimeConfigInTopology(proto::api::Topology* _topology,
DCHECK(_topology->IsInitialized());
ComponentConfigMap::const_iterator iter;
+ const char* topology_key =
config::TopologyConfigHelper::GetReservedTopologyConfigKey();
for (iter = _config.begin(); iter != _config.end(); ++iter) {
// Get config for topology or component.
std::map<std::string, std::string> runtime_config;
- AppendPostfix(iter->second, RUNTIME_CONFIG_POSTFIX, runtime_config);
- if (iter->first == TOPOLOGY_CONFIG_KEY) {
+ config::TopologyConfigHelper::ConvertToRuntimeConfigs(iter->second,
runtime_config);
+ if (iter->first == topology_key) {
config::TopologyConfigHelper::SetTopologyConfig(_topology,
runtime_config);
} else {
config::TopologyConfigHelper::SetComponentConfig(_topology, iter->first,
runtime_config);
@@ -1022,8 +1026,9 @@ bool TMaster::ValidateRuntimeConfigNames(const
ComponentConfigMap& _config) cons
config::TopologyConfigHelper::GetAllComponentNames(topology, components);
ComponentConfigMap::const_iterator iter;
+ const char* topology_key =
config::TopologyConfigHelper::GetReservedTopologyConfigKey();
for (iter = _config.begin(); iter != _config.end(); ++iter) {
- if (iter->first != TOPOLOGY_CONFIG_KEY) {
+ if (iter->first != topology_key) {
// It is a component, search for it
if (components.find(iter->first) == components.end()) {
return false;
@@ -1034,12 +1039,12 @@ bool TMaster::ValidateRuntimeConfigNames(const
ComponentConfigMap& _config) cons
return true;
}
-void TMaster::AppendPostfix(const ConfigValueMap& _origin,
- const std::string& post_fix,
- ConfigValueMap& _retval) {
- ConfigValueMap::const_iterator it;
- for (it = _origin.begin(); it != _origin.end(); ++it) {
- _retval[it->first + post_fix] = it->second;
+void TMaster::LogConfig(const ComponentConfigMap& _config) {
+ for (auto iter = _config.begin(); iter != _config.end(); ++iter) {
+ LOG(INFO) << iter->first << " =>";
+ for (auto i = iter->second.begin(); i != iter->second.end(); ++i) {
+ LOG(INFO) << i->first << " : " << i->second;
+ }
}
}
diff --git a/heron/tmaster/src/cpp/manager/tmaster.h
b/heron/tmaster/src/cpp/manager/tmaster.h
index ca2b0d42a8..defe8fd058 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.h
+++ b/heron/tmaster/src/cpp/manager/tmaster.h
@@ -47,9 +47,6 @@ typedef std::map<std::string, std::string> ConfigValueMap;
// From component name to config/value pairs
typedef std::map<std::string, std::map<std::string, std::string>>
ComponentConfigMap;
-const sp_string TOPOLOGY_CONFIG_KEY = "_topology_";
-const sp_string RUNTIME_CONFIG_POSTFIX = ":runtime";
-
class TMaster {
public:
TMaster(const std::string& _zk_hostport, const std::string& _topology_name,
@@ -124,6 +121,9 @@ class TMaster {
// Function to be called that calls MakePhysicalPlan and sends it to all
stmgrs
void DoPhysicalPlan(EventLoop::Status _code);
+ // Log config object
+ void LogConfig(const ComponentConfigMap& _config);
+
// Big brother function that does the assignment to the workers
// If _new_stmgr is null, this means that there was a plan
// existing, but a _new_stmgr joined us. So redo his part
@@ -192,10 +192,6 @@ class TMaster {
sp_int32 port,
const std::string& stmgr_id);
- void AppendPostfix(const ConfigValueMap& _origin,
- const std::string& post_fix,
- ConfigValueMap& _update);
-
// map of active stmgr id to stmgr state
StMgrMap stmgrs_;
diff --git a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
index 6db68df233..0f37dc826a 100644
--- a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
+++ b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
@@ -735,7 +735,8 @@ TEST(StMgr, test_runtime_config) {
std::map<std::string, std::string> validate_good_config;
validate_good_config[topology_runtime_config_1] = "1";
validate_good_config[topology_runtime_config_2] = "2";
- validate_good_config_map[heron::tmaster::TOPOLOGY_CONFIG_KEY] =
validate_good_config;
+ const char* topology_key =
heron::config::TopologyConfigHelper::GetReservedTopologyConfigKey();
+ validate_good_config_map[topology_key] = validate_good_config;
validate_good_config_map["spout1"] = validate_good_config;
EXPECT_EQ(common.tmaster_->ValidateRuntimeConfig(validate_good_config_map),
true);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services