mobuchowski commented on code in PR #45294:
URL: https://github.com/apache/airflow/pull/45294#discussion_r1910371400


##########
providers/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -127,35 +158,34 @@ def on_task_instance_running(
             return
 
         # Needs to be calculated outside of inner method so that it gets 
cached for usage in fork processes
+        data_interval_start = dagrun.data_interval_start
+        if isinstance(data_interval_start, datetime):
+            data_interval_start = data_interval_start.isoformat()
+        data_interval_end = dagrun.data_interval_end
+        if isinstance(data_interval_end, datetime):
+            data_interval_end = data_interval_end.isoformat()
+
         debug_facet = get_airflow_debug_facet()
 
         @print_warning(self.log)
         def on_running():
-            # that's a workaround to detect task running from deferred state
-            # we return here because Airflow 2.3 needs task from deferred state
-            if task_instance.next_method is not None:
-                return
-
-            if is_ti_rescheduled_already(task_instance):
+            context = task_instance.get_template_context()
+            if hasattr(context, "task_reschedule_count") and 
context["task_reschedule_count"] > 0:
                 self.log.debug("Skipping this instance of rescheduled task - 
START event was emitted already")
                 return

Review Comment:
   I think context will be eventually cached in Airflow 3 - it's essentially 
static dict at this point. Would leave optimizations for later 🙂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to