mobuchowski commented on code in PR #41690:
URL: https://github.com/apache/airflow/pull/41690#discussion_r1728750924


##########
airflow/providers/openlineage/plugins/listener.py:
##########
@@ -413,65 +415,112 @@ def before_stopping(self, component) -> None:
 
     @hookimpl
     def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None:
-        if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
-            self.log.debug(
-                "Skipping OpenLineage event emission for DAG `%s` "
-                "due to lack of explicit lineage enablement for DAG while "
-                "[openlineage] selective_enable is on.",
-                dag_run.dag_id,
+        try:
+            if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+                self.log.debug(
+                    "Skipping OpenLineage event emission for DAG `%s` "
+                    "due to lack of explicit lineage enablement for DAG while "
+                    "[openlineage] selective_enable is on.",
+                    dag_run.dag_id,
+                )
+                return
+
+            if not self.executor:
+                self.log.debug("Executor have not started before 
`on_dag_run_running`")
+                return
+
+            data_interval_start = (
+                dag_run.data_interval_start.isoformat() if 
dag_run.data_interval_start else None
             )
-            return
+            data_interval_end = dag_run.data_interval_end.isoformat() if 
dag_run.data_interval_end else None
 
-        if not self.executor:
-            self.log.debug("Executor have not started before 
`on_dag_run_running`")
-            return
+            run_facets = {**get_airflow_dag_run_facet(dag_run)}
 
-        data_interval_start = dag_run.data_interval_start.isoformat() if 
dag_run.data_interval_start else None
-        data_interval_end = dag_run.data_interval_end.isoformat() if 
dag_run.data_interval_end else None
-        self.executor.submit(
-            self.adapter.dag_started,
-            dag_run=dag_run,
-            msg=msg,
-            nominal_start_time=data_interval_start,
-            nominal_end_time=data_interval_end,
-            # AirflowJobFacet should be created outside ProcessPoolExecutor 
that pickles objects,
-            # as it causes lack of some TaskGroup attributes and crashes event 
emission.
-            job_facets=get_airflow_job_facet(dag_run=dag_run),
-        )
+            self.submit_callable(
+                self.adapter.dag_started,
+                dag_id=dag_run.dag_id,
+                run_id=dag_run.run_id,
+                execution_date=dag_run.execution_date,
+                start_date=dag_run.start_date,
+                nominal_start_time=data_interval_start,
+                nominal_end_time=data_interval_end,
+                run_facets=run_facets,
+                owners=[x.strip() for x in dag_run.dag.owner.split(",")] if 
dag_run.dag else None,
+                description=dag_run.dag.description if dag_run.dag else None,
+                # AirflowJobFacet should be created outside 
ProcessPoolExecutor that pickles objects,
+                # as it causes lack of some TaskGroup attributes and crashes 
event emission.
+                job_facets=get_airflow_job_facet(dag_run=dag_run),
+            )
+        except BaseException as e:
+            self.log.warning("OpenLineage received exception in method 
on_dag_run_running", exc_info=e)
 
     @hookimpl
     def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None:
-        if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
-            self.log.debug(
-                "Skipping OpenLineage event emission for DAG `%s` "
-                "due to lack of explicit lineage enablement for DAG while "
-                "[openlineage] selective_enable is on.",
-                dag_run.dag_id,
-            )
-            return
+        try:
+            if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+                self.log.debug(
+                    "Skipping OpenLineage event emission for DAG `%s` "
+                    "due to lack of explicit lineage enablement for DAG while "
+                    "[openlineage] selective_enable is on.",
+                    dag_run.dag_id,
+                )
+                return
 
-        if not self.executor:
-            self.log.debug("Executor have not started before 
`on_dag_run_success`")
-            return
+            if not self.executor:
+                self.log.debug("Executor have not started before 
`on_dag_run_success`")
+                return
 
-        self.executor.submit(self.adapter.dag_success, dag_run=dag_run, 
msg=msg)
+            task_ids = DagRun._get_partial_task_ids(dag_run.dag)

Review Comment:
   This is what is `get_task_instances` does underneath. I don't think we want 
to change it as part of this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to