This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 912d84b Allow interpolate more spout config parameters in extra links
(#3310)
912d84b is described below
commit 912d84bd4a0fc1fd095d2493a2af2745a3f93223
Author: Xiaoyao Qian <[email protected]>
AuthorDate: Wed Jul 10 11:52:47 2019 -0700
Allow interpolate more spout config parameters in extra links (#3310)
* Allow interpolate more spout config parameters in extra links
* new interpolate design
---
.../config/src/yaml/tracker/heron_tracker.yaml | 14 -------
heron/tools/tracker/src/python/config.py | 37 +++++--------------
heron/tools/tracker/src/python/tracker.py | 43 +++++++++-------------
3 files changed, 27 insertions(+), 67 deletions(-)
diff --git a/heron/tools/config/src/yaml/tracker/heron_tracker.yaml
b/heron/tools/config/src/yaml/tracker/heron_tracker.yaml
index 15f87c8..1d8cf5d 100644
--- a/heron/tools/config/src/yaml/tracker/heron_tracker.yaml
+++ b/heron/tools/config/src/yaml/tracker/heron_tracker.yaml
@@ -69,17 +69,3 @@ statemgrs:
# formatter: "http://127.0.0.1/viz/${ENVIRON}-${CLUSTER}-${TOPOLOGY}"
# - name: "Alerts"
# formatter: "http://127.0.0.1/alerts/${ENVIRON}-${CLUSTER}-${TOPOLOGY}"
-#
-# spout.extra.links:
-# - spout.type: "kafka"
-# extra.links:
-# - name: "Viz"
-# formatter:
"http://127.0.0.1/kafka/viz/${ENVIRON}-${CLUSTER}-${TOPOLOGY}-${SPOUT_NAME}-${SPOUT_SOURCE}"
-# - name: "Alerts"
-# formatter:
"http://127.0.0.1/kafka/alerts/${ENVIRON}-${CLUSTER}-${TOPOLOGY}-${SPOUT_NAME}-${SPOUT_SOURCE}"
-# - spout.type: "default"
-# extra.links:
-# - name: "Viz"
-# formatter:
"http://127.0.0.1/default/viz/${ENVIRON}-${CLUSTER}-${TOPOLOGY}-${SPOUT_NAME}-${SPOUT_SOURCE}"
-# - name: "Alerts"
-# formatter:
"http://127.0.0.1/default/alerts/${ENVIRON}-${CLUSTER}-${TOPOLOGY}-${SPOUT_NAME}-${SPOUT_SOURCE}"
diff --git a/heron/tools/tracker/src/python/config.py
b/heron/tools/tracker/src/python/config.py
index 6dbfc3d..08887b8 100644
--- a/heron/tools/tracker/src/python/config.py
+++ b/heron/tools/tracker/src/python/config.py
@@ -27,8 +27,6 @@ EXTRA_LINKS_KEY = "extra.links"
EXTRA_LINK_NAME_KEY = "name"
EXTRA_LINK_FORMATTER_KEY = "formatter"
EXTRA_LINK_URL_KEY = "url"
-SPOUT_EXTRA_LINKS_KEY = "spout.extra.links"
-SPOUT_TYPE_KEY = "spout.type"
class Config(object):
"""
@@ -40,11 +38,9 @@ class Config(object):
self.configs = configs
self.statemgr_config = StateMgrConfig()
self.extra_links = []
- self.spout_extra_links = {}
self.load_configs()
- # pylint: disable=line-too-long
def load_configs(self):
"""load config files"""
self.statemgr_config.set_state_locations(self.configs[STATEMGRS_KEY])
@@ -52,10 +48,6 @@ class Config(object):
for extra_link in self.configs[EXTRA_LINKS_KEY]:
self.extra_links.append(self.validate_extra_link(extra_link))
- if SPOUT_EXTRA_LINKS_KEY in self.configs:
- for extra_link in self.configs[SPOUT_EXTRA_LINKS_KEY]:
- self.spout_extra_links[extra_link[SPOUT_TYPE_KEY]] =
[self.validate_extra_link(link) for link in extra_link[EXTRA_LINKS_KEY]]
-
def validate_extra_link(self, extra_link):
"""validate extra link"""
if EXTRA_LINK_NAME_KEY not in extra_link or EXTRA_LINK_FORMATTER_KEY not
in extra_link:
@@ -77,8 +69,6 @@ class Config(object):
"${TOPOLOGY}": "topology",
"${ROLE}": "role",
"${USER}": "user",
- "${SPOUT_NAME}": "spout_name",
- "${SPOUT_SOURCE}": "spout_source",
}
dummy_formatted_url = url_format
for key, value in valid_parameters.items():
@@ -91,34 +81,25 @@ class Config(object):
# No error is thrown, so the format is valid.
return url_format
- def get_formatted_url(self, formatter, execution_state, **additional):
+ def get_formatted_url(self, formatter, execution_state):
"""
+ @param formatter: The template string to interpolate
@param execution_state: The python dict representing JSON execution_state
- @param additional: additional kwargs to interpolate
@return Formatted viz url
"""
# Create the parameters based on execution state
- valid_parameters = {
- "${CLUSTER}": execution_state.get("cluster",
- additional.get("cluster",
"${CLUSTER}")),
- "${ENVIRON}": execution_state.get("environ",
- additional.get("environ",
"${ENVIRON}")),
- "${TOPOLOGY}": execution_state.get("jobname",
- additional.get("jobname",
"${TOPOLOGY}")),
- "${ROLE}": execution_state.get("role",
- additional.get("role", "${ROLE}")),
- "${USER}": execution_state.get("submission_user",
- additional.get("submission_user",
"${USER}")),
- "${SPOUT_NAME}": execution_state.get("spout.name",
- additional.get("spout.name",
"${SPOUT_NAME}")),
- "${SPOUT_SOURCE}": execution_state.get("spout.source",
- additional.get("spout.source",
"${SPOUT_SOURCE}")),
+ common_parameters = {
+ "${CLUSTER}": execution_state.get("cluster", "${CLUSTER}"),
+ "${ENVIRON}": execution_state.get("environ", "${ENVIRON}"),
+ "${TOPOLOGY}": execution_state.get("jobname", "${TOPOLOGY}"),
+ "${ROLE}": execution_state.get("role", "${ROLE}"),
+ "${USER}": execution_state.get("submission_user", "${USER}"),
}
formatted_url = formatter
- for key, value in valid_parameters.items():
+ for key, value in common_parameters.items():
formatted_url = formatted_url.replace(key, value)
return formatted_url
diff --git a/heron/tools/tracker/src/python/tracker.py
b/heron/tools/tracker/src/python/tracker.py
index 77be6a2..3ce814d 100644
--- a/heron/tools/tracker/src/python/tracker.py
+++ b/heron/tools/tracker/src/python/tracker.py
@@ -386,25 +386,6 @@ class Tracker(object):
"bolts": {},
}
- # Pre-render component extra links with general params
- execution_state = topology.execution_state
- executionState = {
- "cluster": execution_state.cluster,
- "environ": execution_state.environ,
- "role": execution_state.role,
- "jobname": topology.name,
- "submission_user": execution_state.submission_user,
- }
-
- spout_extra_links = {}
- for spout_type, extra_links in self.config.spout_extra_links.items():
- spout_extra_links[spout_type] = []
- for extra_link in extra_links:
- link = extra_link.copy()
- link[EXTRA_LINK_URL_KEY] =
self.config.get_formatted_url(link[EXTRA_LINK_FORMATTER_KEY],
-
executionState)
- spout_extra_links[spout_type].append(link)
-
# Add spouts.
for spout in topology.spouts():
spoutName = spout.comp.name
@@ -412,6 +393,7 @@ class Tracker(object):
spoutSource = "NA"
spoutVersion = "NA"
spoutConfigs = spout.comp.config.kvs
+ spoutExtraLinks = []
for kvs in spoutConfigs:
if kvs.key == "spout.type":
spoutType = javaobj.loads(kvs.serialized_value)
@@ -419,20 +401,31 @@ class Tracker(object):
spoutSource = javaobj.loads(kvs.serialized_value)
elif kvs.key == "spout.version":
spoutVersion = javaobj.loads(kvs.serialized_value)
+ elif kvs.key == "extra.links":
+ spoutExtraLinks = json.loads(javaobj.loads(kvs.serialized_value))
+
spoutPlan = {
"config": convert_pb_kvs(spoutConfigs, include_non_primitives=False),
"type": spoutType,
"source": spoutSource,
"version": spoutVersion,
"outputs": [],
- "extra_links": [],
+ "extra_links": spoutExtraLinks,
}
- for extra_link in spout_extra_links.get(spoutType, []):
- extra_link[EXTRA_LINK_URL_KEY] = self.config.get_formatted_url(
- extra_link[EXTRA_LINK_URL_KEY],
- spoutPlan["config"], **{"spout.name": spoutName})
- spoutPlan["extra_links"].append(extra_link)
+ # render component extra links with general params
+ execution_state = topology.execution_state
+ executionState = {
+ "cluster": execution_state.cluster,
+ "environ": execution_state.environ,
+ "role": execution_state.role,
+ "jobname": topology.name,
+ "submission_user": execution_state.submission_user,
+ }
+
+ for link in spoutPlan["extra_links"]:
+ link[EXTRA_LINK_URL_KEY] =
self.config.get_formatted_url(link[EXTRA_LINK_FORMATTER_KEY],
+
executionState)
for outputStream in list(spout.outputs):
spoutPlan["outputs"].append({