Code0x58 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887689080


##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -241,8 +244,16 @@ def __init__(self, name: str, state_manager_name: str, 
tracker_config: Config) -
     self.id: Optional[int] = None
     self.tracker_config: Config = tracker_config
     # this maps pb2 structs to structures returned via API endpoints
-    # it is repopulated every time one of the pb2 roperties is updated
+    # it is repopulated every time one of the pb2 properties is updated
     self.info: Optional[TopologyInfo] = None
+    self.lock = threading.RLock()
+
+  def __eq__(self, o):
+    return isinstance(o, Topology) \
+           and o.name == self.name \
+           and o.state_manager_name == self.state_manager_name \
+           and o.cluster == self.cluster \
+           and o.environ == self.environ

Review Comment:
   to be pedantic, you might want some sort of lock that simultaneously locks 
both `self` and `o`, e.g. some global lock on state to ensure a consistent read 
for comparison, but that is likely over-the-top for this.
   
   was this introduced for set operations, otherwise I'm not spotting where 
this is being used.



##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: 
List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in 
self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of 
{state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")

Review Comment:
   it's generally recommended to leave the logging in the form `(format_str, 
*args)`, as it avoids unnecessary stringification and interpolation (i.e. when 
the message level is lower than the configured logging level), so would be good 
to change back to that format



##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -589,30 +601,31 @@ def _update(
       scheduler_location=...,
     ) -> None:
     """Atomically update this instance to avoid inconsistent reads/writes from 
other threads."""

Review Comment:
   If there was one topology per thread, then I don't think the lock would have 
an impact, but that's a big _if_ given I can't recall the threading structure. 
If that was the case, you'd probably want the lock for a consistent read, e.g. 
while doing a comparison/`__eq__`.



##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -100,11 +103,15 @@ def get_topology(
                   and t.cluster == cluster
                   and (not role or t.execution_state.role == role)
                   and t.environ == environ]
-    if len(topologies) != 1:
+    if len(topologies) == 0:

Review Comment:
   as this is a behaviour change, is it known to rectify anything? now it warns 
if there is more than one topology, but not I wonder if that is a but. I don't 
recall if this would cause issues (without reading around a lot), so not sure 
it should be included unless it is a known improvement



##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: 
List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in 
self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of 
{state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")
+      cached_names = [t.name for t in 
self.get_stmgr_topologies(state_manager.name)]

Review Comment:
   was there a reason to convert this from a set to a list? the code below 
still ends up doing what are essentially set operations, just implicitly and 
slower



##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -114,37 +121,37 @@ def get_stmgr_topologies(self, name: str) -> List[Any]:
     """
     return [t for t in self.topologies if t.state_manager_name == name]
 
-  def add_new_topology(self, state_manager, topology_name: str) -> None:
+  def add_new_topology(self, state_manager: StateManager, topology_name: str) 
-> None:
     """
     Adds a topology in the local cache, and sets a watch
     on any changes on the topology.
     """
     topology = Topology(topology_name, state_manager.name, self.config)
-    Log.info("Adding new topology: %s, state_manager: %s",
-             topology_name, state_manager.name)
-    # populate the cache before making it addressable in the topologies to
-    # avoid races due to concurrent execution
-    self.topologies.append(topology)
-
-    # Set watches on the pplan, execution_state, tmanager and 
scheduler_location.
-    state_manager.get_pplan(topology_name, topology.set_physical_plan)
-    state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
-    state_manager.get_execution_state(topology_name, 
topology.set_execution_state)
-    state_manager.get_tmanager(topology_name, topology.set_tmanager)
-    state_manager.get_scheduler_location(topology_name, 
topology.set_scheduler_location)
+    with self.lock:

Review Comment:
   I guess `state_manager.name` can't change and `self.config` can't change



##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -114,37 +121,37 @@ def get_stmgr_topologies(self, name: str) -> List[Any]:
     """
     return [t for t in self.topologies if t.state_manager_name == name]
 
-  def add_new_topology(self, state_manager, topology_name: str) -> None:
+  def add_new_topology(self, state_manager: StateManager, topology_name: str) 
-> None:
     """
     Adds a topology in the local cache, and sets a watch
     on any changes on the topology.
     """
     topology = Topology(topology_name, state_manager.name, self.config)
-    Log.info("Adding new topology: %s, state_manager: %s",
-             topology_name, state_manager.name)
-    # populate the cache before making it addressable in the topologies to
-    # avoid races due to concurrent execution
-    self.topologies.append(topology)
-
-    # Set watches on the pplan, execution_state, tmanager and 
scheduler_location.
-    state_manager.get_pplan(topology_name, topology.set_physical_plan)
-    state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
-    state_manager.get_execution_state(topology_name, 
topology.set_execution_state)
-    state_manager.get_tmanager(topology_name, topology.set_tmanager)
-    state_manager.get_scheduler_location(topology_name, 
topology.set_scheduler_location)
+    with self.lock:
+      if topology not in self.topologies:
+        Log.info(f"Adding new topology: {topology_name}, state_manager: 
{state_manager.name}")
+        self.topologies.append(topology)
+
+      # Set watches on the pplan, execution_state, tmanager and 
scheduler_location.
+      state_manager.get_pplan(topology_name, topology.set_physical_plan)
+      state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
+      state_manager.get_execution_state(topology_name, 
topology.set_execution_state)
+      state_manager.get_tmanager(topology_name, topology.set_tmanager)
+      state_manager.get_scheduler_location(topology_name, 
topology.set_scheduler_location)

Review Comment:
   I suppose here you might want a lock on the `state_manager` for a consistent 
read, unless there's lock or immutability somewhere else to keep things 
consistent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to