Taragolis commented on code in PR #39650:
URL: https://github.com/apache/airflow/pull/39650#discussion_r1602156245


##########
airflow/task/task_runner/standard_task_runner.py:
##########
@@ -186,3 +193,12 @@ def get_process_pid(self) -> int:
         if self.process is None:
             raise RuntimeError("Process is not started yet")
         return self.process.pid
+
+    def _read_task_utilization(self):
+        while True:
+            dag_id = self._task_instance.dag_id
+            task_id = self._task_instance.task_id

Review Comment:
   I guess we could grab it outside of loop



##########
airflow/task/task_runner/standard_task_runner.py:
##########
@@ -186,3 +193,12 @@ def get_process_pid(self) -> int:
         if self.process is None:
             raise RuntimeError("Process is not started yet")
         return self.process.pid
+
+    def _read_task_utilization(self):
+        while True:
+            dag_id = self._task_instance.dag_id
+            task_id = self._task_instance.task_id
+            mem_usage = self.process.memory_percent()
+            cpu_usage = self.process.cpu_percent(interval=1)

Review Comment:
   There is also might be a good idea to grab it inside of `oneshot` context 
manager
   https://psutil.readthedocs.io/en/latest/#psutil.Process.oneshot
   
   In additional it might be also a good idea to catch 
[NoSuchProcess](https://psutil.readthedocs.io/en/latest/#psutil.NoSuchProcess) 
and 
[AccessDenied](https://psutil.readthedocs.io/en/latest/#psutil.AccessDenied) 
exceptions
   
   I collect example in the single snippet
   
   ```python
   import multiprocessing as mp
   import threading
   import time
   from string import ascii_letters
   
   import psutil
   
   
   def _read_task_utilization(pid: int):
       proc = psutil.Process(pid=pid)
       while True:
           try:
               with proc.oneshot():
                   mem = proc.memory_percent()
                   cpu = proc.cpu_percent()
           except (psutil.NoSuchProcess, psutil.AccessDenied) as ex:
               print(f"Whooops! {type(ex).__name__}: {ex}")
           else:
               print(f"Memory %: {mem}")
               print(f"CPU %: {cpu}")
           time.sleep(1)
   
   
   def some_expensive_calculation():
       eat_my_memory = ""
       for char in ascii_letters:
           eat_my_memory += char * 1_000_000
           _ = reversed(eat_my_memory)
           time.sleep(0.1)
   
   
   def main():
       log_reader = threading.Thread(target=_read_task_utilization)
       log_reader.daemon = True
       log_reader.start()
   
       start = time.monotonic()
       some_expensive_calculation()
       print(f"Total Time: {time.monotonic() - start}")
   
   
   if __name__ == "__main__":
       mp_context = mp.get_context("fork")
       process = mp_context.Process(target=some_expensive_calculation)
       process.start()
   
       resource_monitor = threading.Thread(target=_read_task_utilization, 
args=(process.pid,))
       resource_monitor.daemon = True
       resource_monitor.start()
       process.join()
   
       time.sleep(3)  # Emulate additional time for a handle after the process 
finished
   ```
   
   ```console
   Memory %: 0.025844573974609375
   CPU %: 0.0
   Memory %: 0.3806114196777344
   CPU %: 1.8
   Memory %: 1.0632514953613281
   CPU %: 6.3
   Memory %: 1.7674446105957031
   CPU %: 9.9
   Memory %: 1.5843391418457031
   CPU %: 11.6
   Memory %: 2.2841453552246094
   CPU %: 12.7
   Whooops! NoSuchProcess: process no longer exists (pid=78454)
   Whooops! NoSuchProcess: process no longer exists (pid=78454)
   Whooops! NoSuchProcess: process no longer exists (pid=78454)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to