kacpermuda commented on code in PR #40854:
URL: https://github.com/apache/airflow/pull/40854#discussion_r1684196136


##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -307,6 +308,17 @@ class TaskGroupInfo(InfoJsonEncodable):
     ]
 
 
+def get_airflow_dag_run_facet(dag_run: DagRun) -> dict[str, BaseFacet]:
+    if not dag_run.dag:
+        return {}
+    return {
+        "airflow_dagRun": AirflowDagRunFacet(

Review Comment:
   Interesting, i missed that part of specification about naming when I created 
the `airflowState` facet. 



##########
airflow/providers/openlineage/plugins/listener.py:
##########
@@ -420,7 +421,8 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str) -> 
None:
             nominal_end_time=data_interval_end,
             # AirflowJobFacet should be created outside ProcessPoolExecutor 
that pickles objects,
             # as it causes lack of some TaskGroup attributes and crashes event 
emission.
-            job_facets={**get_airflow_job_facet(dag_run=dag_run)},
+            job_facets=get_airflow_job_facet(dag_run=dag_run),
+            run_facets=get_airflow_dag_run_facet(dag_run),

Review Comment:
   Did you check if we can perform this inside the `self.adapter.dag_started` 
method? It's possible that the pickling problem mentioned above doesn't affect 
the `dag` and `dagrun` that we use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to