This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 065cd19de4f834ceca05b51d121399a3b80719ac Author: Ping Zhang <[email protected]> AuthorDate: Mon Jun 6 02:47:43 2022 -0700 Unify return_code interface for task runner (#24093) (cherry picked from commit 603c555e1f0f607edb3f171ca5d206f60056c656) --- airflow/task/task_runner/base_task_runner.py | 2 +- airflow/task/task_runner/cgroup_task_runner.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py index 47be386b74..2ab649db87 100644 --- a/airflow/task/task_runner/base_task_runner.py +++ b/airflow/task/task_runner/base_task_runner.py @@ -174,7 +174,7 @@ class BaseTaskRunner(LoggingMixin): """Start running the task instance in a subprocess.""" raise NotImplementedError() - def return_code(self) -> Optional[int]: + def return_code(self, timeout: int = 0) -> Optional[int]: """ :return: The return code associated with running the task instance or None if the task is not yet done. diff --git a/airflow/task/task_runner/cgroup_task_runner.py b/airflow/task/task_runner/cgroup_task_runner.py index d6c6e53abf..a5604a6d4e 100644 --- a/airflow/task/task_runner/cgroup_task_runner.py +++ b/airflow/task/task_runner/cgroup_task_runner.py @@ -21,6 +21,7 @@ import datetime import os import uuid +from typing import Optional import psutil from cgroupspy import trees @@ -163,7 +164,7 @@ class CgroupTaskRunner(BaseTaskRunner): self.log.debug("Starting task process with cgroups cpu,memory: %s", cgroup_name) self.process = self.run_command(['cgexec', '-g', f'cpu,memory:{cgroup_name}']) - def return_code(self): + def return_code(self, timeout: int = 0) -> Optional[int]: return_code = self.process.poll() # TODO(plypaul) Monitoring the control file in the cgroup fs is better than # checking the return code here. The PR to use this is here:
