Code0x58 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887689080
##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -241,8 +244,16 @@ def __init__(self, name: str, state_manager_name: str,
tracker_config: Config) -
self.id: Optional[int] = None
self.tracker_config: Config = tracker_config
# this maps pb2 structs to structures returned via API endpoints
- # it is repopulated every time one of the pb2 roperties is updated
+ # it is repopulated every time one of the pb2 properties is updated
self.info: Optional[TopologyInfo] = None
+ self.lock = threading.RLock()
+
+ def __eq__(self, o):
+ return isinstance(o, Topology) \
+ and o.name == self.name \
+ and o.state_manager_name == self.state_manager_name \
+ and o.cluster == self.cluster \
+ and o.environ == self.environ
Review Comment:
to be pedantic, you might want some sort of lock that simultaneously locks
both `self` and `o`, e.g. some global lock on state to ensure a consistent read
for comparison, but that is likely over-the-top for this.
was this introduced for set operations, otherwise I'm not spotting where
this is being used.
##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
def on_topologies_watch(state_manager: StateManager, topologies:
List[str]) -> None:
"""watch topologies"""
topologies = set(topologies)
- Log.info("State watch triggered for topologies.")
- Log.debug("Topologies: %s", topologies)
- cached_names = {t.name for t in
self.get_stmgr_topologies(state_manager.name)}
- Log.debug("Existing topologies: %s", cached_names)
- for name in cached_names - topologies:
- Log.info("Removing topology: %s in rootpath: %s",
- name, state_manager.rootpath)
- self.remove_topology(name, state_manager.name)
-
- for name in topologies - cached_names:
- self.add_new_topology(state_manager, name)
+ Log.info(f"State watch triggered for topologies of
{state_manager.name}.")
+ Log.debug(f"Received topologies: {state_manager.name}, {topologies}")
Review Comment:
it's generally recommended to leave the logging in the form `(format_str,
*args)`, as it avoids unnecessary stringification and interpolation (i.e. when
the message level is lower than the configured logging level), so would be good
to change back to that format
##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -589,30 +601,31 @@ def _update(
scheduler_location=...,
) -> None:
"""Atomically update this instance to avoid inconsistent reads/writes from
other threads."""
Review Comment:
If there was one topology per thread, then I don't think the lock would have
an impact, but that's a big _if_ given I can't recall the threading structure.
If that was the case, you'd probably want the lock for a consistent read, e.g.
while doing a comparison/`__eq__`.
##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -100,11 +103,15 @@ def get_topology(
and t.cluster == cluster
and (not role or t.execution_state.role == role)
and t.environ == environ]
- if len(topologies) != 1:
+ if len(topologies) == 0:
Review Comment:
as this is a behaviour change, is it known to rectify anything? now it warns
if there is more than one topology, but not I wonder if that is a but. I don't
recall if this would cause issues (without reading around a lot), so not sure
it should be included unless it is a known improvement
##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
def on_topologies_watch(state_manager: StateManager, topologies:
List[str]) -> None:
"""watch topologies"""
topologies = set(topologies)
- Log.info("State watch triggered for topologies.")
- Log.debug("Topologies: %s", topologies)
- cached_names = {t.name for t in
self.get_stmgr_topologies(state_manager.name)}
- Log.debug("Existing topologies: %s", cached_names)
- for name in cached_names - topologies:
- Log.info("Removing topology: %s in rootpath: %s",
- name, state_manager.rootpath)
- self.remove_topology(name, state_manager.name)
-
- for name in topologies - cached_names:
- self.add_new_topology(state_manager, name)
+ Log.info(f"State watch triggered for topologies of
{state_manager.name}.")
+ Log.debug(f"Received topologies: {state_manager.name}, {topologies}")
+ cached_names = [t.name for t in
self.get_stmgr_topologies(state_manager.name)]
Review Comment:
was there a reason to convert this from a set to a list? the code below
still ends up doing what are essentially set operations, just implicitly and
slower
##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -114,37 +121,37 @@ def get_stmgr_topologies(self, name: str) -> List[Any]:
"""
return [t for t in self.topologies if t.state_manager_name == name]
- def add_new_topology(self, state_manager, topology_name: str) -> None:
+ def add_new_topology(self, state_manager: StateManager, topology_name: str)
-> None:
"""
Adds a topology in the local cache, and sets a watch
on any changes on the topology.
"""
topology = Topology(topology_name, state_manager.name, self.config)
- Log.info("Adding new topology: %s, state_manager: %s",
- topology_name, state_manager.name)
- # populate the cache before making it addressable in the topologies to
- # avoid races due to concurrent execution
- self.topologies.append(topology)
-
- # Set watches on the pplan, execution_state, tmanager and
scheduler_location.
- state_manager.get_pplan(topology_name, topology.set_physical_plan)
- state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
- state_manager.get_execution_state(topology_name,
topology.set_execution_state)
- state_manager.get_tmanager(topology_name, topology.set_tmanager)
- state_manager.get_scheduler_location(topology_name,
topology.set_scheduler_location)
+ with self.lock:
Review Comment:
I guess `state_manager.name` can't change and `self.config` can't change
##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -114,37 +121,37 @@ def get_stmgr_topologies(self, name: str) -> List[Any]:
"""
return [t for t in self.topologies if t.state_manager_name == name]
- def add_new_topology(self, state_manager, topology_name: str) -> None:
+ def add_new_topology(self, state_manager: StateManager, topology_name: str)
-> None:
"""
Adds a topology in the local cache, and sets a watch
on any changes on the topology.
"""
topology = Topology(topology_name, state_manager.name, self.config)
- Log.info("Adding new topology: %s, state_manager: %s",
- topology_name, state_manager.name)
- # populate the cache before making it addressable in the topologies to
- # avoid races due to concurrent execution
- self.topologies.append(topology)
-
- # Set watches on the pplan, execution_state, tmanager and
scheduler_location.
- state_manager.get_pplan(topology_name, topology.set_physical_plan)
- state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
- state_manager.get_execution_state(topology_name,
topology.set_execution_state)
- state_manager.get_tmanager(topology_name, topology.set_tmanager)
- state_manager.get_scheduler_location(topology_name,
topology.set_scheduler_location)
+ with self.lock:
+ if topology not in self.topologies:
+ Log.info(f"Adding new topology: {topology_name}, state_manager:
{state_manager.name}")
+ self.topologies.append(topology)
+
+ # Set watches on the pplan, execution_state, tmanager and
scheduler_location.
+ state_manager.get_pplan(topology_name, topology.set_physical_plan)
+ state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
+ state_manager.get_execution_state(topology_name,
topology.set_execution_state)
+ state_manager.get_tmanager(topology_name, topology.set_tmanager)
+ state_manager.get_scheduler_location(topology_name,
topology.set_scheduler_location)
Review Comment:
I suppose here you might want a lock on the `state_manager` for a consistent
read, unless there's lock or immutability somewhere else to keep things
consistent
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]