Taragolis commented on code in PR #39650:
URL: https://github.com/apache/airflow/pull/39650#discussion_r1602156245
##########
airflow/task/task_runner/standard_task_runner.py:
##########
@@ -186,3 +193,12 @@ def get_process_pid(self) -> int:
if self.process is None:
raise RuntimeError("Process is not started yet")
return self.process.pid
+
+ def _read_task_utilization(self):
+ while True:
+ dag_id = self._task_instance.dag_id
+ task_id = self._task_instance.task_id
Review Comment:
I guess we could grab it outside of loop
##########
airflow/task/task_runner/standard_task_runner.py:
##########
@@ -186,3 +193,12 @@ def get_process_pid(self) -> int:
if self.process is None:
raise RuntimeError("Process is not started yet")
return self.process.pid
+
+ def _read_task_utilization(self):
+ while True:
+ dag_id = self._task_instance.dag_id
+ task_id = self._task_instance.task_id
+ mem_usage = self.process.memory_percent()
+ cpu_usage = self.process.cpu_percent(interval=1)
Review Comment:
There is also might be a good idea to grab it inside of `oneshot` context
manager
https://psutil.readthedocs.io/en/latest/#psutil.Process.oneshot
In additional it might be also a good idea to catch
[NoSuchProcess](https://psutil.readthedocs.io/en/latest/#psutil.NoSuchProcess)
and
[AccessDenied](https://psutil.readthedocs.io/en/latest/#psutil.AccessDenied)
exceptions
I collect example in the single snippet
```python
import multiprocessing as mp
import threading
import time
from string import ascii_letters
import psutil
def _read_task_utilization(pid: int):
proc = psutil.Process(pid=pid)
while True:
try:
with proc.oneshot():
mem = proc.memory_percent()
cpu = proc.cpu_percent()
except (psutil.NoSuchProcess, psutil.AccessDenied) as ex:
print(f"Whooops! {type(ex).__name__}: {ex}")
else:
print(f"Memory %: {mem}")
print(f"CPU %: {cpu}")
time.sleep(1)
def some_expensive_calculation():
eat_my_memory = ""
for char in ascii_letters:
eat_my_memory += char * 1_000_000
_ = reversed(eat_my_memory)
time.sleep(0.1)
def main():
log_reader = threading.Thread(target=_read_task_utilization)
log_reader.daemon = True
log_reader.start()
start = time.monotonic()
some_expensive_calculation()
print(f"Total Time: {time.monotonic() - start}")
if __name__ == "__main__":
mp_context = mp.get_context("fork")
process = mp_context.Process(target=some_expensive_calculation)
process.start()
resource_monitor = threading.Thread(target=_read_task_utilization,
args=(process.pid,))
resource_monitor.daemon = True
resource_monitor.start()
process.join()
time.sleep(3) # Emulate additional time for a handle after the process
finished
```
```console
Memory %: 0.025844573974609375
CPU %: 0.0
Memory %: 0.3806114196777344
CPU %: 1.8
Memory %: 1.0632514953613281
CPU %: 6.3
Memory %: 1.7674446105957031
CPU %: 9.9
Memory %: 1.5843391418457031
CPU %: 11.6
Memory %: 2.2841453552246094
CPU %: 12.7
Whooops! NoSuchProcess: process no longer exists (pid=78454)
Whooops! NoSuchProcess: process no longer exists (pid=78454)
Whooops! NoSuchProcess: process no longer exists (pid=78454)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]