mobuchowski commented on code in PR #49040:
URL: https://github.com/apache/airflow/pull/49040#discussion_r2039466756


##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -338,10 +345,20 @@ def on_success():
         def on_task_instance_failed(
             self,
             previous_state: TaskInstanceState,
-            task_instance: TaskInstance,
+            task_instance: RuntimeTaskInstance | TaskInstance,
             error: None | str | BaseException,
         ) -> None:
             self.log.debug("OpenLineage listener got notification about task 
instance failure")
+
+            if isinstance(task_instance, TaskInstance):
+                self._on_task_instance_manual_state_change(
+                    ti=task_instance,
+                    dagrun=task_instance.dag_run,
+                    ti_state=TaskInstanceState.FAILED,
+                    error=error,
+                )
+                return
+

Review Comment:
   NIce - best to separate those paths as much as possible.



##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -457,6 +474,60 @@ def on_failure():
 
         self._execute(on_failure, "on_failure", use_fork=True)
 
+    def _on_task_instance_manual_state_change(
+        self,
+        ti: TaskInstance,
+        dagrun: DagRun,
+        ti_state: TaskInstanceState,
+        error: None | str | BaseException = None,
+    ) -> None:
+        self.log.debug("`_on_task_instance_manual_state_change` was called 
with state: `%s`.", ti_state)
+        end_date = timezone.utcnow()
+
+        @print_warning(self.log)
+        def on_state_change():
+            date = dagrun.logical_date or dagrun.run_after
+            parent_run_id = self.adapter.build_dag_run_id(
+                dag_id=dagrun.dag_id,
+                logical_date=date,
+                clear_number=dagrun.clear_number,
+            )
+
+            task_uuid = self.adapter.build_task_instance_run_id(
+                dag_id=dagrun.dag_id,
+                task_id=ti.task_id,
+                try_number=ti.try_number,
+                logical_date=date,
+                map_index=ti.map_index,
+            )
+
+            adapter_kwargs = {
+                "run_id": task_uuid,
+                "job_name": get_job_name(ti),
+                "parent_job_name": dagrun.dag_id,
+                "parent_run_id": parent_run_id,
+                "end_time": end_date.isoformat(),
+                "task": OperatorLineage(),
+                "run_facets": get_airflow_debug_facet(),
+            }
+
+            if ti_state == TaskInstanceState.FAILED:
+                event_type = RunState.FAIL.value.lower()
+                redacted_event = self.adapter.fail_task(**adapter_kwargs, 
error=error)
+            elif ti_state == TaskInstanceState.SUCCESS:
+                event_type = RunState.COMPLETE.value.lower()
+                redacted_event = self.adapter.complete_task(**adapter_kwargs)
+            else:
+                raise ValueError(f"Unsupported ti_state: `{ti_state}`.")

Review Comment:
   Can we even change state to something else or is this purely defensive check?



##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -457,6 +474,60 @@ def on_failure():
 
         self._execute(on_failure, "on_failure", use_fork=True)
 
+    def _on_task_instance_manual_state_change(
+        self,
+        ti: TaskInstance,
+        dagrun: DagRun,
+        ti_state: TaskInstanceState,
+        error: None | str | BaseException = None,
+    ) -> None:
+        self.log.debug("`_on_task_instance_manual_state_change` was called 
with state: `%s`.", ti_state)
+        end_date = timezone.utcnow()
+
+        @print_warning(self.log)
+        def on_state_change():
+            date = dagrun.logical_date or dagrun.run_after
+            parent_run_id = self.adapter.build_dag_run_id(
+                dag_id=dagrun.dag_id,
+                logical_date=date,
+                clear_number=dagrun.clear_number,
+            )
+
+            task_uuid = self.adapter.build_task_instance_run_id(
+                dag_id=dagrun.dag_id,
+                task_id=ti.task_id,
+                try_number=ti.try_number,
+                logical_date=date,
+                map_index=ti.map_index,
+            )
+
+            adapter_kwargs = {
+                "run_id": task_uuid,
+                "job_name": get_job_name(ti),
+                "parent_job_name": dagrun.dag_id,
+                "parent_run_id": parent_run_id,
+                "end_time": end_date.isoformat(),
+                "task": OperatorLineage(),
+                "run_facets": get_airflow_debug_facet(),
+            }
+
+            if ti_state == TaskInstanceState.FAILED:
+                event_type = RunState.FAIL.value.lower()
+                redacted_event = self.adapter.fail_task(**adapter_kwargs, 
error=error)
+            elif ti_state == TaskInstanceState.SUCCESS:
+                event_type = RunState.COMPLETE.value.lower()
+                redacted_event = self.adapter.complete_task(**adapter_kwargs)
+            else:
+                raise ValueError(f"Unsupported ti_state: `{ti_state}`.")
+
+            operator_name = ti.operator.lower()
+            Stats.gauge(
+                f"ol.event.size.{event_type}.{operator_name}",
+                len(Serde.to_json(redacted_event).encode("utf-8")),
+            )

Review Comment:
   Rethinking those stats, I don't think gauge makes sense here - histogram 
would make more sense. But that's on separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to