mobuchowski commented on code in PR #41690:
URL: https://github.com/apache/airflow/pull/41690#discussion_r1731123544


##########
airflow/providers/openlineage/plugins/listener.py:
##########
@@ -413,65 +415,112 @@ def before_stopping(self, component) -> None:
 
     @hookimpl
     def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None:
-        if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
-            self.log.debug(
-                "Skipping OpenLineage event emission for DAG `%s` "
-                "due to lack of explicit lineage enablement for DAG while "
-                "[openlineage] selective_enable is on.",
-                dag_run.dag_id,
+        try:
+            if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+                self.log.debug(
+                    "Skipping OpenLineage event emission for DAG `%s` "
+                    "due to lack of explicit lineage enablement for DAG while "
+                    "[openlineage] selective_enable is on.",
+                    dag_run.dag_id,
+                )
+                return
+
+            if not self.executor:
+                self.log.debug("Executor have not started before 
`on_dag_run_running`")
+                return
+
+            data_interval_start = (
+                dag_run.data_interval_start.isoformat() if 
dag_run.data_interval_start else None
             )
-            return
+            data_interval_end = dag_run.data_interval_end.isoformat() if 
dag_run.data_interval_end else None
 
-        if not self.executor:
-            self.log.debug("Executor have not started before 
`on_dag_run_running`")
-            return
+            run_facets = {**get_airflow_dag_run_facet(dag_run)}
 
-        data_interval_start = dag_run.data_interval_start.isoformat() if 
dag_run.data_interval_start else None
-        data_interval_end = dag_run.data_interval_end.isoformat() if 
dag_run.data_interval_end else None
-        self.executor.submit(
-            self.adapter.dag_started,
-            dag_run=dag_run,
-            msg=msg,
-            nominal_start_time=data_interval_start,
-            nominal_end_time=data_interval_end,
-            # AirflowJobFacet should be created outside ProcessPoolExecutor 
that pickles objects,
-            # as it causes lack of some TaskGroup attributes and crashes event 
emission.
-            job_facets=get_airflow_job_facet(dag_run=dag_run),
-        )
+            self.submit_callable(
+                self.adapter.dag_started,
+                dag_id=dag_run.dag_id,
+                run_id=dag_run.run_id,
+                execution_date=dag_run.execution_date,
+                start_date=dag_run.start_date,
+                nominal_start_time=data_interval_start,
+                nominal_end_time=data_interval_end,
+                run_facets=run_facets,
+                owners=[x.strip() for x in dag_run.dag.owner.split(",")] if 
dag_run.dag else None,
+                description=dag_run.dag.description if dag_run.dag else None,
+                # AirflowJobFacet should be created outside 
ProcessPoolExecutor that pickles objects,
+                # as it causes lack of some TaskGroup attributes and crashes 
event emission.
+                job_facets=get_airflow_job_facet(dag_run=dag_run),
+            )
+        except BaseException as e:
+            self.log.warning("OpenLineage received exception in method 
on_dag_run_running", exc_info=e)
 
     @hookimpl
     def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None:
-        if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
-            self.log.debug(
-                "Skipping OpenLineage event emission for DAG `%s` "
-                "due to lack of explicit lineage enablement for DAG while "
-                "[openlineage] selective_enable is on.",
-                dag_run.dag_id,
-            )
-            return
+        try:
+            if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+                self.log.debug(
+                    "Skipping OpenLineage event emission for DAG `%s` "
+                    "due to lack of explicit lineage enablement for DAG while "
+                    "[openlineage] selective_enable is on.",
+                    dag_run.dag_id,
+                )
+                return
 
-        if not self.executor:
-            self.log.debug("Executor have not started before 
`on_dag_run_success`")
-            return
+            if not self.executor:
+                self.log.debug("Executor have not started before 
`on_dag_run_success`")
+                return
 
-        self.executor.submit(self.adapter.dag_success, dag_run=dag_run, 
msg=msg)
+            task_ids = DagRun._get_partial_task_ids(dag_run.dag)
+            self.submit_callable(
+                self.adapter.dag_success,
+                dag_id=dag_run.dag_id,
+                run_id=dag_run.run_id,
+                end_date=dag_run.end_date,
+                execution_date=dag_run.execution_date,
+                task_ids=task_ids,
+                state=dag_run.get_state(),
+            )
+        except BaseException as e:
+            self.log.warning("OpenLineage received exception in method 
on_dag_run_success", exc_info=e)
 
     @hookimpl
     def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None:
-        if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
-            self.log.debug(
-                "Skipping OpenLineage event emission for DAG `%s` "
-                "due to lack of explicit lineage enablement for DAG while "
-                "[openlineage] selective_enable is on.",
-                dag_run.dag_id,
+        try:
+            if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+                self.log.debug(
+                    "Skipping OpenLineage event emission for DAG `%s` "
+                    "due to lack of explicit lineage enablement for DAG while "
+                    "[openlineage] selective_enable is on.",
+                    dag_run.dag_id,
+                )
+                return
+
+            if not self.executor:
+                self.log.debug("Executor have not started before 
`on_dag_run_failed`")
+                return
+
+            self.submit_callable(
+                self.adapter.dag_failed,
+                dag_id=dag_run.dag_id,
+                run_id=dag_run.run_id,
+                end_date=dag_run.end_date,
+                execution_date=dag_run.execution_date,
+                state=dag_run.get_state(),
+                msg=msg,
             )

Review Comment:
   Fixed.



##########
airflow/providers/openlineage/plugins/adapter.py:
##########
@@ -375,18 +377,29 @@ def dag_started(
             # This part cannot be wrapped to deduplicate code, otherwise the 
method cannot be pickled in multiprocessing.
             self.log.warning("Failed to emit DAG started event: \n %s", 
traceback.format_exc())
 
-    def dag_success(self, dag_run: DagRun, msg: str):
+    def dag_success(
+        self,
+        dag_id: str,
+        run_id: str,
+        end_date: datetime,
+        execution_date: datetime,
+        state: DagRunState,

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to