This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch ol-add-runfacet-complete-fail in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f3ad57ba821a1b2ad98067cd634aaeea74b9bb48 Author: Maciej Obuchowski <[email protected]> AuthorDate: Wed Jul 24 14:35:21 2024 +0200 openlineage: add AirflowRun on COMPLETE/FAIL events Signed-off-by: Maciej Obuchowski <[email protected]> --- airflow/providers/openlineage/plugins/adapter.py | 9 +++++---- airflow/providers/openlineage/plugins/listener.py | 10 ++++++++-- tests/providers/openlineage/plugins/test_adapter.py | 5 ++++- tests/providers/openlineage/plugins/test_listener.py | 20 ++++++++++++++++---- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index 7405556088..e71334184e 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -178,7 +178,7 @@ class OpenLineageAdapter(LoggingMixin): nominal_end_time: str | None, owners: list[str], task: OperatorLineage | None, - run_facets: dict[str, RunFacet] | None = None, # Custom run facets + run_facets: dict[str, RunFacet] | None = None, ) -> RunEvent: """ Emit openlineage event of type START. @@ -243,7 +243,7 @@ class OpenLineageAdapter(LoggingMixin): parent_run_id: str | None, end_time: str, task: OperatorLineage, - run_facets: dict[str, RunFacet] | None = None, # Custom run facets + run_facets: dict[str, RunFacet] | None = None, ) -> RunEvent: """ Emit openlineage event of type COMPLETE. @@ -255,7 +255,7 @@ class OpenLineageAdapter(LoggingMixin): :param parent_run_id: identifier of job spawning this task :param end_time: time of task completion :param task: metadata container with information extracted from operator - :param run_facets: custom run facets + :param run_facets: additional run facets """ run_facets = run_facets or {} if task: @@ -285,8 +285,8 @@ class OpenLineageAdapter(LoggingMixin): parent_run_id: str | None, end_time: str, task: OperatorLineage, - run_facets: dict[str, RunFacet] | None = None, # Custom run facets error: str | BaseException | None = None, + run_facets: dict[str, RunFacet] | None = None, ) -> RunEvent: """ Emit openlineage event of type FAIL. @@ -300,6 +300,7 @@ class OpenLineageAdapter(LoggingMixin): :param task: metadata container with information extracted from operator :param run_facets: custom run facets :param error: error + :param run_facets: additional run facets """ run_facets = run_facets or {} if task: diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 58ccdcad24..2f227fa2e6 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -235,7 +235,10 @@ class OpenLineageListener: parent_run_id=parent_run_id, end_time=end_date.isoformat(), task=task_metadata, - run_facets=get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS), + run_facets={ + **get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS), + **get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid), + }, ) Stats.gauge( f"ol.event.size.{event_type}.{operator_name}", @@ -330,8 +333,11 @@ class OpenLineageListener: parent_run_id=parent_run_id, end_time=end_date.isoformat(), task=task_metadata, - run_facets=get_user_provided_run_facets(task_instance, TaskInstanceState.FAILED), error=error, + run_facets={ + **get_user_provided_run_facets(task_instance, TaskInstanceState.FAILED), + **get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid), + }, ) Stats.gauge( f"ol.event.size.{event_type}.{operator_name}", diff --git a/tests/providers/openlineage/plugins/test_adapter.py b/tests/providers/openlineage/plugins/test_adapter.py index b648bb51d3..3deeac3894 100644 --- a/tests/providers/openlineage/plugins/test_adapter.py +++ b/tests/providers/openlineage/plugins/test_adapter.py @@ -45,7 +45,10 @@ from airflow.operators.empty import EmptyOperator from airflow.providers.openlineage.conf import namespace from airflow.providers.openlineage.extractors import OperatorLineage from airflow.providers.openlineage.plugins.adapter import _PRODUCER, OpenLineageAdapter -from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, AirflowStateRunFacet +from airflow.providers.openlineage.plugins.facets import ( + AirflowDagRunFacet, + AirflowStateRunFacet, +) from airflow.providers.openlineage.utils.utils import get_airflow_job_facet from airflow.utils.task_group import TaskGroup from tests.test_utils.config import conf_vars diff --git a/tests/providers/openlineage/plugins/test_listener.py b/tests/providers/openlineage/plugins/test_listener.py index b05a934e02..3b0c9f0159 100644 --- a/tests/providers/openlineage/plugins/test_listener.py +++ b/tests/providers/openlineage/plugins/test_listener.py @@ -264,11 +264,16 @@ def test_adapter_start_task_is_called_with_proper_arguments( @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") @mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter") [email protected]("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call) def test_adapter_fail_task_is_called_with_proper_arguments( - mock_get_job_name, mock_get_user_provided_run_facets, mocked_adapter, mock_disabled + mock_get_job_name, + mock_get_user_provided_run_facets, + mock_get_airflow_run_facet, + mocked_adapter, + mock_disabled, ): """Tests that the 'fail_task' method of the OpenLineageAdapter is invoked with the correct arguments. @@ -289,6 +294,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( mocked_adapter.build_dag_run_id.side_effect = mock_dag_id mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_disabled.return_value = False err = ValueError("test") @@ -305,18 +311,23 @@ def test_adapter_fail_task_is_called_with_proper_arguments( parent_run_id="execution_date.dag_id", run_id="execution_date.dag_id.task_id.1", task=listener.extractor_manager.extract_metadata(), - run_facets={"custom_user_facet": 2}, + run_facets={"custom_user_facet": 2, "airflow": {"task": "..."}}, **expected_err_kwargs, ) @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") @mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter") [email protected]("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call) def test_adapter_complete_task_is_called_with_proper_arguments( - mock_get_job_name, mock_get_user_provided_run_facets, mocked_adapter, mock_disabled + mock_get_job_name, + mock_get_user_provided_run_facets, + mock_get_airflow_run_facet, + mocked_adapter, + mock_disabled, ): """Tests that the 'complete_task' method of the OpenLineageAdapter is called with the correct arguments. @@ -338,6 +349,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( mocked_adapter.build_dag_run_id.side_effect = mock_dag_id mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_disabled.return_value = False listener.on_task_instance_success(None, task_instance, None) @@ -352,7 +364,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( parent_run_id="execution_date.dag_id", run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}", task=listener.extractor_manager.extract_metadata(), - run_facets={"custom_user_facet": 2}, + run_facets={"custom_user_facet": 2, "airflow": {"task": "..."}}, )
