This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a011f9eb2a nit: Use new taskinstance method to determine if task will 
emit OL event (#57446)
1a011f9eb2a is described below

commit 1a011f9eb2aaf4c69dd6a892b2691b42f5073bc5
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Oct 30 11:57:29 2025 +0100

    nit: Use new taskinstance method to determine if task will emit OL event 
(#57446)
---
 .../airflow/providers/openlineage/utils/utils.py   | 46 ++++++++++++----------
 1 file changed, 26 insertions(+), 20 deletions(-)

diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index 8ef5c23ced8..bd3901e84e7 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -33,7 +33,7 @@ from openlineage.client.utils import RedactMixin
 from airflow import __version__ as AIRFLOW_VERSION
 
 # TODO: move this maybe to Airflow's logic?
-from airflow.models import DagRun, TaskReschedule
+from airflow.models import DagRun, TaskInstance, TaskReschedule
 from airflow.models.mappedoperator import MappedOperator as 
SerializedMappedOperator
 from airflow.providers.common.compat.assets import Asset
 from airflow.providers.common.compat.sdk import DAG, BaseOperator, 
BaseSensorOperator, MappedOperator
@@ -68,7 +68,6 @@ if TYPE_CHECKING:
     from openlineage.client.event_v2 import Dataset as OpenLineageDataset
     from openlineage.client.facet_v2 import RunFacet, processing_engine_run
 
-    from airflow.models import TaskInstance
     from airflow.sdk.execution_time.secrets_masker import (
         Redactable,
         Redacted,
@@ -782,25 +781,32 @@ def _get_task_groups_details(dag: DAG | SerializedDAG) -> 
dict:
 def _emits_ol_events(task: AnyOperator) -> bool:
     config_selective_enabled = is_selective_lineage_enabled(task)
     config_disabled_for_operators = is_operator_disabled(task)
-    # empty operators without callbacks/outlets are skipped for optimization 
by Airflow
-    # in airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks
-    is_skipped_as_empty_operator = all(
-        (
-            task.inherits_from_empty_operator,
-            not getattr(task, "on_execute_callback", None),
-            not getattr(task, "on_success_callback", None),
-            not task.outlets,
-            not (task.inlets and get_base_airflow_version_tuple() >= (3, 0, 
2)),  # Added in 3.0.2 #50773
-            not (
-                getattr(task, "has_on_execute_callback", None)  # Added in 
3.1.0 #54569
-                and get_base_airflow_version_tuple() >= (3, 1, 0)
-            ),
-            not (
-                getattr(task, "has_on_success_callback", None)  # Added in 
3.1.0 #54569
-                and get_base_airflow_version_tuple() >= (3, 1, 0)
-            ),
+
+    is_task_schedulable_method = getattr(TaskInstance, "is_task_schedulable", 
None)  # Added in 3.2.0 #56039
+    if is_task_schedulable_method and callable(is_task_schedulable_method):
+        is_skipped_as_empty_operator = not is_task_schedulable_method(task)
+    else:
+        # For older Airflow versions, re-create Airflow core internal logic as
+        # empty operators without callbacks/outlets are skipped for 
optimization by Airflow
+        # in 
airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks or
+        # airflow.models.dagrun.DagRun.schedule_tis, depending on Airflow 
version
+        is_skipped_as_empty_operator = all(
+            (
+                task.inherits_from_empty_operator,
+                not getattr(task, "on_execute_callback", None),
+                not getattr(task, "on_success_callback", None),
+                not task.outlets,
+                not (task.inlets and get_base_airflow_version_tuple() >= (3, 
0, 2)),  # Added in 3.0.2 #50773
+                not (
+                    getattr(task, "has_on_execute_callback", None)  # Added in 
3.1.0 #54569
+                    and get_base_airflow_version_tuple() >= (3, 1, 0)
+                ),
+                not (
+                    getattr(task, "has_on_success_callback", None)  # Added in 
3.1.0 #54569
+                    and get_base_airflow_version_tuple() >= (3, 1, 0)
+                ),
+            )
         )
-    )
 
     emits_ol_events = all(
         (

Reply via email to