This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1a011f9eb2a nit: Use new taskinstance method to determine if task will
emit OL event (#57446)
1a011f9eb2a is described below
commit 1a011f9eb2aaf4c69dd6a892b2691b42f5073bc5
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Oct 30 11:57:29 2025 +0100
nit: Use new taskinstance method to determine if task will emit OL event
(#57446)
---
.../airflow/providers/openlineage/utils/utils.py | 46 ++++++++++++----------
1 file changed, 26 insertions(+), 20 deletions(-)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index 8ef5c23ced8..bd3901e84e7 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -33,7 +33,7 @@ from openlineage.client.utils import RedactMixin
from airflow import __version__ as AIRFLOW_VERSION
# TODO: move this maybe to Airflow's logic?
-from airflow.models import DagRun, TaskReschedule
+from airflow.models import DagRun, TaskInstance, TaskReschedule
from airflow.models.mappedoperator import MappedOperator as
SerializedMappedOperator
from airflow.providers.common.compat.assets import Asset
from airflow.providers.common.compat.sdk import DAG, BaseOperator,
BaseSensorOperator, MappedOperator
@@ -68,7 +68,6 @@ if TYPE_CHECKING:
from openlineage.client.event_v2 import Dataset as OpenLineageDataset
from openlineage.client.facet_v2 import RunFacet, processing_engine_run
- from airflow.models import TaskInstance
from airflow.sdk.execution_time.secrets_masker import (
Redactable,
Redacted,
@@ -782,25 +781,32 @@ def _get_task_groups_details(dag: DAG | SerializedDAG) ->
dict:
def _emits_ol_events(task: AnyOperator) -> bool:
config_selective_enabled = is_selective_lineage_enabled(task)
config_disabled_for_operators = is_operator_disabled(task)
- # empty operators without callbacks/outlets are skipped for optimization
by Airflow
- # in airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks
- is_skipped_as_empty_operator = all(
- (
- task.inherits_from_empty_operator,
- not getattr(task, "on_execute_callback", None),
- not getattr(task, "on_success_callback", None),
- not task.outlets,
- not (task.inlets and get_base_airflow_version_tuple() >= (3, 0,
2)), # Added in 3.0.2 #50773
- not (
- getattr(task, "has_on_execute_callback", None) # Added in
3.1.0 #54569
- and get_base_airflow_version_tuple() >= (3, 1, 0)
- ),
- not (
- getattr(task, "has_on_success_callback", None) # Added in
3.1.0 #54569
- and get_base_airflow_version_tuple() >= (3, 1, 0)
- ),
+
+ is_task_schedulable_method = getattr(TaskInstance, "is_task_schedulable",
None) # Added in 3.2.0 #56039
+ if is_task_schedulable_method and callable(is_task_schedulable_method):
+ is_skipped_as_empty_operator = not is_task_schedulable_method(task)
+ else:
+ # For older Airflow versions, re-create Airflow core internal logic as
+ # empty operators without callbacks/outlets are skipped for
optimization by Airflow
+ # in
airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks or
+ # airflow.models.dagrun.DagRun.schedule_tis, depending on Airflow
version
+ is_skipped_as_empty_operator = all(
+ (
+ task.inherits_from_empty_operator,
+ not getattr(task, "on_execute_callback", None),
+ not getattr(task, "on_success_callback", None),
+ not task.outlets,
+ not (task.inlets and get_base_airflow_version_tuple() >= (3,
0, 2)), # Added in 3.0.2 #50773
+ not (
+ getattr(task, "has_on_execute_callback", None) # Added in
3.1.0 #54569
+ and get_base_airflow_version_tuple() >= (3, 1, 0)
+ ),
+ not (
+ getattr(task, "has_on_success_callback", None) # Added in
3.1.0 #54569
+ and get_base_airflow_version_tuple() >= (3, 1, 0)
+ ),
+ )
)
- )
emits_ol_events = all(
(