dolfinus commented on code in PR #40854:
URL: https://github.com/apache/airflow/pull/40854#discussion_r1684314367


##########
airflow/providers/openlineage/plugins/listener.py:
##########
@@ -420,7 +421,8 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str) -> 
None:
             nominal_end_time=data_interval_end,
             # AirflowJobFacet should be created outside ProcessPoolExecutor 
that pickles objects,
             # as it causes lack of some TaskGroup attributes and crashes event 
emission.
-            job_facets={**get_airflow_job_facet(dag_run=dag_run)},
+            job_facets=get_airflow_job_facet(dag_run=dag_run),
+            run_facets=get_airflow_dag_run_facet(dag_run),

Review Comment:
   Moved building run facet and filling up nominal start/end time to the 
adapter.
   
   Tested on demo dag, don't see any issues with DagRun pickling, unlike 
TaskGroup. Here is an example of DAG run start event:
   <details>
   
   ```json
   {
     "eventTime": "2024-07-19T12:27:26.050677+00:00",
     "eventType": "START",
     "inputs": [],
     "job": {
       "facets": {
         "airflow": {
           "_producer": 
"https://github.com/apache/airflow/tree/providers-openlineage/1.9.1";,
           "_schemaURL": 
"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet";,
           "taskGroups": {
             "section_1": {
               "tooltip": "Tasks for section_1",
               "ui_color": "CornflowerBlue",
               "ui_fgcolor": "#000",
               "ui_label": "section_1"
             },
             "section_2": {
               "tooltip": "Tasks for section_2",
               "ui_color": "CornflowerBlue",
               "ui_fgcolor": "#000",
               "ui_label": "section_2"
             },
             "section_2.inner_section_2": {
               "parent_group": "section_2",
               "tooltip": "Tasks for inner_section2",
               "ui_color": "CornflowerBlue",
               "ui_fgcolor": "#000",
               "ui_label": "inner_section_2"
             }
           },
           "taskTree": {
             "start": {
               "section_1.task_1": {
                 "section_1.task_2": {
                   "section_2.inner_section_2.task_2": {
                     "section_2.inner_section_2.task_4": {
                       "end": {}
                     }
                   },
                   "section_2.inner_section_2.task_3": {
                     "section_2.inner_section_2.task_4": {
                       "end": {}
                     }
                   },
                   "section_2.task_1": {
                     "end": {}
                   }
                 },
                 "section_1.task_3": {
                   "section_2.inner_section_2.task_2": {
                     "section_2.inner_section_2.task_4": {
                       "end": {}
                     }
                   },
                   "section_2.inner_section_2.task_3": {
                     "section_2.inner_section_2.task_4": {
                       "end": {}
                     }
                   },
                   "section_2.task_1": {
                     "end": {}
                   }
                 }
               }
             }
           },
           "tasks": {
             "end": {
               "emits_ol_events": false,
               "is_setup": false,
               "is_teardown": false,
               "operator": "airflow.operators.empty.EmptyOperator",
               "ui_color": "#e8f7e4",
               "ui_fgcolor": "#000",
               "ui_label": "end"
             },
             "section_1.task_1": {
               "emits_ol_events": false,
               "is_setup": false,
               "is_teardown": false,
               "operator": "airflow.operators.empty.EmptyOperator",
               "task_group": "section_1",
               "ui_color": "#e8f7e4",
               "ui_fgcolor": "#000",
               "ui_label": "task_1"
             },
             "section_1.task_2": {
               "emits_ol_events": true,
               "is_setup": false,
               "is_teardown": false,
               "operator": "airflow.operators.bash.BashOperator",
               "task_group": "section_1",
               "ui_color": "#f0ede4",
               "ui_fgcolor": "#000",
               "ui_label": "task_2"
             },
             "section_1.task_3": {
               "emits_ol_events": false,
               "is_setup": false,
               "is_teardown": false,
               "operator": "airflow.operators.empty.EmptyOperator",
               "task_group": "section_1",
               "ui_color": "#e8f7e4",
               "ui_fgcolor": "#000",
               "ui_label": "task_3"
             },
             "section_2.inner_section_2.task_2": {
               "emits_ol_events": true,
               "is_setup": false,
               "is_teardown": false,
               "operator": "airflow.operators.bash.BashOperator",
               "task_group": "section_2.inner_section_2",
               "ui_color": "#f0ede4",
               "ui_fgcolor": "#000",
               "ui_label": "task_2"
             },
             "section_2.inner_section_2.task_3": {
               "emits_ol_events": false,
               "is_setup": false,
               "is_teardown": false,
               "operator": "airflow.operators.empty.EmptyOperator",
               "task_group": "section_2.inner_section_2",
               "ui_color": "#e8f7e4",
               "ui_fgcolor": "#000",
               "ui_label": "task_3"
             },
             "section_2.inner_section_2.task_4": {
               "emits_ol_events": false,
               "is_setup": false,
               "is_teardown": false,
               "operator": "airflow.operators.empty.EmptyOperator",
               "task_group": "section_2.inner_section_2",
               "ui_color": "#e8f7e4",
               "ui_fgcolor": "#000",
               "ui_label": "task_4"
             },
             "section_2.task_1": {
               "emits_ol_events": false,
               "is_setup": false,
               "is_teardown": false,
               "operator": "airflow.operators.empty.EmptyOperator",
               "task_group": "section_2",
               "ui_color": "#e8f7e4",
               "ui_fgcolor": "#000",
               "ui_label": "task_1"
             },
             "start": {
               "emits_ol_events": false,
               "is_setup": false,
               "is_teardown": false,
               "operator": "airflow.operators.empty.EmptyOperator",
               "ui_color": "#e8f7e4",
               "ui_fgcolor": "#000",
               "ui_label": "start"
             }
           }
         },
         "jobType": {
           "_producer": 
"https://github.com/apache/airflow/tree/providers-openlineage/1.9.1";,
           "_schemaURL": 
"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/JobTypeJobFacet";,
           "integration": "AIRFLOW",
           "jobType": "DAG",
           "processingType": "BATCH"
         },
         "ownership": {
           "_producer": 
"https://github.com/apache/airflow/tree/providers-openlineage/1.9.1";,
           "_schemaURL": 
"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/OwnershipJobFacet";,
           "owners": [
             {
               "name": "airflow"
             }
           ]
         }
       },
       "name": "example_task_group",
       "namespace": "default"
     },
     "outputs": [],
     "producer": 
"https://github.com/apache/airflow/tree/providers-openlineage/1.9.1";,
     "run": {
       "facets": {
         "airflow_dagRun": {
           "_producer": 
"https://github.com/apache/airflow/tree/providers-openlineage/1.9.1";,
           "_schemaURL": 
"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet";,
           "dag": {
             "dag_id": "example_task_group",
             "owner": "airflow",
             "schedule_interval": "86400.0 seconds",
             "start_date": "2021-01-01T00:00:00+00:00",
             "tags": [
               "example"
             ],
             "timetable": {
               "delta": 86400.0
             }
           },
           "dagRun": {
             "conf": {},
             "dag_id": "example_task_group",
             "data_interval_end": "2024-07-19T12:27:25.372580+00:00",
             "data_interval_start": "2024-07-18T12:27:25.372580+00:00",
             "external_trigger": true,
             "run_id": "manual__2024-07-19T12:27:25.372580+00:00",
             "run_type": "manual",
             "start_date": "2024-07-19T12:27:26.050677+00:00"
           }
         },
         "nominalTime": {
           "_producer": 
"https://github.com/apache/airflow/tree/providers-openlineage/1.9.1";,
           "_schemaURL": 
"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/NominalTimeRunFacet";,
           "nominalEndTime": "2024-07-19T12:27:25.372580+00:00",
           "nominalStartTime": "2024-07-18T12:27:25.372580+00:00"
         }
       },
       "runId": "0190caf6-1d3c-724f-856c-c0b8c295ec09"
     },
     "schemaURL": 
"https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent";
   }
   ```
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to