This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-processexecution in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d2340c362d88ce6bebde2a962d1f93228e50b0c5 Author: Maciej Obuchowski <[email protected]> AuthorDate: Tue May 21 15:44:21 2024 +0200 fork openlineage Signed-off-by: Maciej Obuchowski <[email protected]> --- airflow/config_templates/config.yml | 7 ++ airflow/jobs/local_task_job_runner.py | 19 +++-- airflow/models/taskinstance.py | 2 + airflow/providers/openlineage/plugins/listener.py | 86 +++++++++++++++-------- 4 files changed, 81 insertions(+), 33 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 95d83f9d4c..d23cf2612b 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -329,6 +329,13 @@ core: type: string example: ~ default: "downstream" + task_listener_timeout: + description: | + Maximum possible time (in seconds) that task listener will have for their execution. + version_added: 2.10.0 + type: integer + example: ~ + default: "10" default_task_execution_timeout: description: | The default task execution_timeout value for the operators. Expected an integer value to diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index bb520825f2..ccd6622ba6 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -110,6 +110,8 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): self.terminating = False self._state_change_checks = 0 + # time spend after task completed, but before it exited - used to measure listener execution time + self._overtime = 0.0 def _execute(self) -> int | None: from airflow.task.task_runner import get_task_runner @@ -195,7 +197,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): self.job.heartrate if self.job.heartrate is not None else heartbeat_time_limit, ), ) - + self.log.error("WAITING TIME %f", max_wait_time) return_code = self.task_runner.return_code(timeout=max_wait_time) if return_code is not None: self.handle_task_exit(return_code) @@ -251,6 +253,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): def heartbeat_callback(self, session: Session = NEW_SESSION) -> None: """Self destruct task if state has been moved away from running externally.""" if self.terminating: + self.log.error("TERMINATING") # ensure termination if processes are created later self.task_runner.terminate() return @@ -290,6 +293,8 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): ) raise AirflowException("PID of job runner does not match") elif self.task_runner.return_code() is None and hasattr(self.task_runner, "process"): + self._overtime = (timezone.utcnow() - (ti.end_date or timezone.utcnow())).total_seconds() + self.log.error("checking process code? overtime %f", self._overtime) if ti.state == TaskInstanceState.SKIPPED: # A DagRun timeout will cause tasks to be externally marked as skipped. dagrun = ti.get_dagrun(session=session) @@ -304,13 +309,19 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin): self.log.warning("DagRun timed out after %s.", execution_time) # potential race condition, the _run_raw_task commits `success` or other state - # but task_runner does not exit right away due to slow process shutdown or any other reasons - # let's do a throttle here, if the above case is true, the handle_task_exit will handle it - if self._state_change_checks >= 1: # defer to next round of heartbeat + # but task_runner does not exit right away due to slow process shutdown, listener execution + # or any other reasons - let's do a throttle here, if the above case is true, the + # handle_task_exit will handle it + if self._state_change_checks >= 1 and self._overtime > conf.getint( + "core", "task_listener_timeout" + ): + self.log.warning("Overtime: %f", self._overtime) self.log.warning( "State of this instance has been externally set to %s. Terminating instance.", ti.state ) self.terminating = True + else: + self.log.error("still checking - will kill on next call") self._state_change_checks += 1 def _log_return_code_metric(self, return_code: int): diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index a27579b05e..0dd5cb3175 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2525,10 +2525,12 @@ class TaskInstance(Base, LoggingMixin): except (AirflowFailException, AirflowSensorTimeout) as e: # If AirflowFailException is raised, task should not retry. # If a sensor in reschedule mode reaches timeout, task should not retry. + self.log.exception("FAIL?") self.handle_failure(e, test_mode, context, force_fail=True, session=session) session.commit() raise except (AirflowTaskTimeout, AirflowException, AirflowTaskTerminated) as e: + self.log.exception("TIMEOUT? WUT?") if not test_mode: self.refresh_from_db(lock_for_update=True, session=session) # for case when task is marked as success/failed externally diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index ef736308f7..13a869907e 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -26,7 +26,6 @@ import psutil from openlineage.client.serde import Serde from airflow import __version__ as airflow_version -from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.listeners import hookimpl from airflow.providers.openlineage import conf from airflow.providers.openlineage.extractors import ExtractorManager @@ -39,6 +38,7 @@ from airflow.providers.openlineage.utils.utils import ( is_selective_lineage_enabled, print_warning, ) +from airflow.settings import configure_orm from airflow.stats import Stats from airflow.utils.timeout import timeout @@ -133,7 +133,6 @@ class OpenLineageListener: dagrun.data_interval_start.isoformat() if dagrun.data_interval_start else None ) data_interval_end = dagrun.data_interval_end.isoformat() if dagrun.data_interval_end else None - redacted_event = self.adapter.start_task( run_id=task_uuid, job_name=get_job_name(task), @@ -156,20 +155,28 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) + # on_running() + pid = os.fork() if pid: + self.log.info("On Start: Spawned new process with pid %s", pid) process = psutil.Process(pid) - process.wait(5) + try: + self.log.info("Waiting for process %s", pid) + process.wait(10) + except psutil.TimeoutExpired: + self.log.error("TIMEOUT EXPIRED") + process.kill() + self.log.error("TIMEOUT EXPIRED for %s - AFTER KILL", pid) + except BaseException: + self.log.error("BASE_EXCEPTION") + self.log.info("Process with pid %s finished - parent", pid) else: - if not InternalApiConfig.get_use_internal_api(): - # Force a new SQLAlchemy session. We can't share open DB handles - # between process. The cli code will re-create this as part of its - # normal startup - from airflow import settings - - settings.engine.pool.dispose() - settings.engine.dispose() + configure_orm(disable_connection_pool=True) + self.log.info("After fork - new process with current PID.") on_running() + self.log.info("Process with current pid finishes after on_running") + os._exit(0) @hookimpl def on_task_instance_success(self, previous_state, task_instance: TaskInstance, session): @@ -217,6 +224,11 @@ class OpenLineageListener: ) end_date = task_instance.end_date if task_instance.end_date else datetime.now() + self.log.error("PRE_RE") + import re + + re.match(r"(a?){30}a{30}", "a" * 30) + self.log.error("POST_RE") redacted_event = self.adapter.complete_task( run_id=task_uuid, @@ -231,20 +243,28 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) + # on_success() + pid = os.fork() if pid: + self.log.info("On Success: Spawned new process with pid %s", pid) process = psutil.Process(pid) - process.wait(5) + try: + self.log.info("Waiting for process %s", pid) + process.wait(5) + except psutil.TimeoutExpired: + self.log.error("TIMEOUT EXPIRED") + process.kill() + self.log.error("TIMEOUT EXPIRED for %s - AFTER KILL", pid) + except BaseException: + self.log.error("BASE_EXCEPTION") + self.log.info("Process with pid %s finished - parent", pid) else: - if not InternalApiConfig.get_use_internal_api(): - # Force a new SQLAlchemy session. We can't share open DB handles - # between process. The cli code will re-create this as part of its - # normal startup - from airflow import settings - - settings.engine.pool.dispose() - settings.engine.dispose() + configure_orm(disable_connection_pool=True) + self.log.info("After fork - new process with current PID.") on_success() + self.log.info("Process with current pid finishes after on_success") + os._exit(0) @hookimpl def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, session): @@ -306,20 +326,28 @@ class OpenLineageListener: len(Serde.to_json(redacted_event).encode("utf-8")), ) + # on_failure() + pid = os.fork() if pid: + self.log.info("On Fail: Spawned new process with pid %s", pid) process = psutil.Process(pid) - process.wait(5) + try: + self.log.info("Waiting for process %s", pid) + process.wait(10) + except psutil.TimeoutExpired: + self.log.error("TIMEOUT EXPIRED") + process.kill() + self.log.error("TIMEOUT EXPIRED for %s - AFTER KILL", pid) + except BaseException: + self.log.error("BASE_EXCEPTION") + self.log.info("Process with pid %s finished - parent", pid) else: - if not InternalApiConfig.get_use_internal_api(): - # Force a new SQLAlchemy session. We can't share open DB handles - # between process. The cli code will re-create this as part of its - # normal startup - from airflow import settings - - settings.engine.pool.dispose() - settings.engine.dispose() + configure_orm(disable_connection_pool=True) + self.log.info("After fork - new process with current PID.") on_failure() + self.log.info("Process with current pid finishes after on_failure") + os._exit(0) @property def executor(self):
