kacpermuda commented on code in PR #40854:
URL: https://github.com/apache/airflow/pull/40854#discussion_r1684196136
##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -307,6 +308,17 @@ class TaskGroupInfo(InfoJsonEncodable):
]
+def get_airflow_dag_run_facet(dag_run: DagRun) -> dict[str, BaseFacet]:
+ if not dag_run.dag:
+ return {}
+ return {
+ "airflow_dagRun": AirflowDagRunFacet(
Review Comment:
Interesting, i missed that part of specification about naming when I created
the `airflowState` facet.
##########
airflow/providers/openlineage/plugins/listener.py:
##########
@@ -420,7 +421,8 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str) ->
None:
nominal_end_time=data_interval_end,
# AirflowJobFacet should be created outside ProcessPoolExecutor
that pickles objects,
# as it causes lack of some TaskGroup attributes and crashes event
emission.
- job_facets={**get_airflow_job_facet(dag_run=dag_run)},
+ job_facets=get_airflow_job_facet(dag_run=dag_run),
+ run_facets=get_airflow_dag_run_facet(dag_run),
Review Comment:
Did you check if we can perform this inside the `self.adapter.dag_started`
method? It's possible that the pickling problem mentioned above doesn't affect
the `dag` and `dagrun` that we use.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]