This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch openlineage-processexecution
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d2340c362d88ce6bebde2a962d1f93228e50b0c5
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Tue May 21 15:44:21 2024 +0200

    fork openlineage
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 airflow/config_templates/config.yml               |  7 ++
 airflow/jobs/local_task_job_runner.py             | 19 +++--
 airflow/models/taskinstance.py                    |  2 +
 airflow/providers/openlineage/plugins/listener.py | 86 +++++++++++++++--------
 4 files changed, 81 insertions(+), 33 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 95d83f9d4c..d23cf2612b 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -329,6 +329,13 @@ core:
       type: string
       example: ~
       default: "downstream"
+    task_listener_timeout:
+      description: |
+        Maximum possible time (in seconds) that task listener will have for 
their execution.
+      version_added: 2.10.0
+      type: integer
+      example: ~
+      default: "10"
     default_task_execution_timeout:
       description: |
         The default task execution_timeout value for the operators. Expected 
an integer value to
diff --git a/airflow/jobs/local_task_job_runner.py 
b/airflow/jobs/local_task_job_runner.py
index bb520825f2..ccd6622ba6 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -110,6 +110,8 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
         self.terminating = False
 
         self._state_change_checks = 0
+        # time spend after task completed, but before it exited - used to 
measure listener execution time
+        self._overtime = 0.0
 
     def _execute(self) -> int | None:
         from airflow.task.task_runner import get_task_runner
@@ -195,7 +197,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
                         self.job.heartrate if self.job.heartrate is not None 
else heartbeat_time_limit,
                     ),
                 )
-
+                self.log.error("WAITING TIME %f", max_wait_time)
                 return_code = 
self.task_runner.return_code(timeout=max_wait_time)
                 if return_code is not None:
                     self.handle_task_exit(return_code)
@@ -251,6 +253,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
     def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
         """Self destruct task if state has been moved away from running 
externally."""
         if self.terminating:
+            self.log.error("TERMINATING")
             # ensure termination if processes are created later
             self.task_runner.terminate()
             return
@@ -290,6 +293,8 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
                 )
                 raise AirflowException("PID of job runner does not match")
         elif self.task_runner.return_code() is None and 
hasattr(self.task_runner, "process"):
+            self._overtime = (timezone.utcnow() - (ti.end_date or 
timezone.utcnow())).total_seconds()
+            self.log.error("checking process code? overtime %f", 
self._overtime)
             if ti.state == TaskInstanceState.SKIPPED:
                 # A DagRun timeout will cause tasks to be externally marked as 
skipped.
                 dagrun = ti.get_dagrun(session=session)
@@ -304,13 +309,19 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
                     self.log.warning("DagRun timed out after %s.", 
execution_time)
 
             # potential race condition, the _run_raw_task commits `success` or 
other state
-            # but task_runner does not exit right away due to slow process 
shutdown or any other reasons
-            # let's do a throttle here, if the above case is true, the 
handle_task_exit will handle it
-            if self._state_change_checks >= 1:  # defer to next round of 
heartbeat
+            # but task_runner does not exit right away due to slow process 
shutdown, listener execution
+            # or any other reasons - let's do a throttle here, if the above 
case is true, the
+            # handle_task_exit will handle it
+            if self._state_change_checks >= 1 and self._overtime > conf.getint(
+                "core", "task_listener_timeout"
+            ):
+                self.log.warning("Overtime: %f", self._overtime)
                 self.log.warning(
                     "State of this instance has been externally set to %s. 
Terminating instance.", ti.state
                 )
                 self.terminating = True
+            else:
+                self.log.error("still checking - will kill on next call")
             self._state_change_checks += 1
 
     def _log_return_code_metric(self, return_code: int):
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index a27579b05e..0dd5cb3175 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2525,10 +2525,12 @@ class TaskInstance(Base, LoggingMixin):
             except (AirflowFailException, AirflowSensorTimeout) as e:
                 # If AirflowFailException is raised, task should not retry.
                 # If a sensor in reschedule mode reaches timeout, task should 
not retry.
+                self.log.exception("FAIL?")
                 self.handle_failure(e, test_mode, context, force_fail=True, 
session=session)
                 session.commit()
                 raise
             except (AirflowTaskTimeout, AirflowException, 
AirflowTaskTerminated) as e:
+                self.log.exception("TIMEOUT? WUT?")
                 if not test_mode:
                     self.refresh_from_db(lock_for_update=True, session=session)
                 # for case when task is marked as success/failed externally
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index ef736308f7..13a869907e 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -26,7 +26,6 @@ import psutil
 from openlineage.client.serde import Serde
 
 from airflow import __version__ as airflow_version
-from airflow.api_internal.internal_api_call import InternalApiConfig
 from airflow.listeners import hookimpl
 from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.extractors import ExtractorManager
@@ -39,6 +38,7 @@ from airflow.providers.openlineage.utils.utils import (
     is_selective_lineage_enabled,
     print_warning,
 )
+from airflow.settings import configure_orm
 from airflow.stats import Stats
 from airflow.utils.timeout import timeout
 
@@ -133,7 +133,6 @@ class OpenLineageListener:
                 dagrun.data_interval_start.isoformat() if 
dagrun.data_interval_start else None
             )
             data_interval_end = dagrun.data_interval_end.isoformat() if 
dagrun.data_interval_end else None
-
             redacted_event = self.adapter.start_task(
                 run_id=task_uuid,
                 job_name=get_job_name(task),
@@ -156,20 +155,28 @@ class OpenLineageListener:
                 len(Serde.to_json(redacted_event).encode("utf-8")),
             )
 
+        # on_running()
+
         pid = os.fork()
         if pid:
+            self.log.info("On Start: Spawned new process with pid %s", pid)
             process = psutil.Process(pid)
-            process.wait(5)
+            try:
+                self.log.info("Waiting for process %s", pid)
+                process.wait(10)
+            except psutil.TimeoutExpired:
+                self.log.error("TIMEOUT EXPIRED")
+                process.kill()
+                self.log.error("TIMEOUT EXPIRED for %s - AFTER KILL", pid)
+            except BaseException:
+                self.log.error("BASE_EXCEPTION")
+            self.log.info("Process with pid %s finished - parent", pid)
         else:
-            if not InternalApiConfig.get_use_internal_api():
-                # Force a new SQLAlchemy session. We can't share open DB 
handles
-                # between process. The cli code will re-create this as part of 
its
-                # normal startup
-                from airflow import settings
-
-                settings.engine.pool.dispose()
-                settings.engine.dispose()
+            configure_orm(disable_connection_pool=True)
+            self.log.info("After fork - new process with current PID.")
             on_running()
+            self.log.info("Process with current pid finishes after on_running")
+            os._exit(0)
 
     @hookimpl
     def on_task_instance_success(self, previous_state, task_instance: 
TaskInstance, session):
@@ -217,6 +224,11 @@ class OpenLineageListener:
                 )
 
             end_date = task_instance.end_date if task_instance.end_date else 
datetime.now()
+            self.log.error("PRE_RE")
+            import re
+
+            re.match(r"(a?){30}a{30}", "a" * 30)
+            self.log.error("POST_RE")
 
             redacted_event = self.adapter.complete_task(
                 run_id=task_uuid,
@@ -231,20 +243,28 @@ class OpenLineageListener:
                 len(Serde.to_json(redacted_event).encode("utf-8")),
             )
 
+        # on_success()
+
         pid = os.fork()
         if pid:
+            self.log.info("On Success: Spawned new process with pid %s", pid)
             process = psutil.Process(pid)
-            process.wait(5)
+            try:
+                self.log.info("Waiting for process %s", pid)
+                process.wait(5)
+            except psutil.TimeoutExpired:
+                self.log.error("TIMEOUT EXPIRED")
+                process.kill()
+                self.log.error("TIMEOUT EXPIRED for %s - AFTER KILL", pid)
+            except BaseException:
+                self.log.error("BASE_EXCEPTION")
+            self.log.info("Process with pid %s finished - parent", pid)
         else:
-            if not InternalApiConfig.get_use_internal_api():
-                # Force a new SQLAlchemy session. We can't share open DB 
handles
-                # between process. The cli code will re-create this as part of 
its
-                # normal startup
-                from airflow import settings
-
-                settings.engine.pool.dispose()
-                settings.engine.dispose()
+            configure_orm(disable_connection_pool=True)
+            self.log.info("After fork - new process with current PID.")
             on_success()
+            self.log.info("Process with current pid finishes after on_success")
+            os._exit(0)
 
     @hookimpl
     def on_task_instance_failed(self, previous_state, task_instance: 
TaskInstance, session):
@@ -306,20 +326,28 @@ class OpenLineageListener:
                 len(Serde.to_json(redacted_event).encode("utf-8")),
             )
 
+        # on_failure()
+
         pid = os.fork()
         if pid:
+            self.log.info("On Fail: Spawned new process with pid %s", pid)
             process = psutil.Process(pid)
-            process.wait(5)
+            try:
+                self.log.info("Waiting for process %s", pid)
+                process.wait(10)
+            except psutil.TimeoutExpired:
+                self.log.error("TIMEOUT EXPIRED")
+                process.kill()
+                self.log.error("TIMEOUT EXPIRED for %s - AFTER KILL", pid)
+            except BaseException:
+                self.log.error("BASE_EXCEPTION")
+            self.log.info("Process with pid %s finished - parent", pid)
         else:
-            if not InternalApiConfig.get_use_internal_api():
-                # Force a new SQLAlchemy session. We can't share open DB 
handles
-                # between process. The cli code will re-create this as part of 
its
-                # normal startup
-                from airflow import settings
-
-                settings.engine.pool.dispose()
-                settings.engine.dispose()
+            configure_orm(disable_connection_pool=True)
+            self.log.info("After fork - new process with current PID.")
             on_failure()
+            self.log.info("Process with current pid finishes after on_failure")
+            os._exit(0)
 
     @property
     def executor(self):

Reply via email to