jedcunningham commented on code in PR #63568:
URL: https://github.com/apache/airflow/pull/63568#discussion_r2978998723


##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -758,27 +796,31 @@ def _maybe_reschedule_startup_failure(
     )
 
 
+@detail_span("parse")
 def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
     # TODO: Task-SDK:
     # Using BundleDagBag here is about 98% wrong, but it'll do for now
     from airflow.dag_processing.dagbag import BundleDagBag
 
     bundle_info = what.bundle_info
-    bundle_instance = DagBundlesManager().get_bundle(
-        name=bundle_info.name,
-        version=bundle_info.version,
-    )
-    bundle_instance.initialize()
+    with detail_span("get_bundle"):
+        bundle_instance = DagBundlesManager().get_bundle(
+            name=bundle_info.name,
+            version=bundle_info.version,
+        )
+    with detail_span("initialize"):

Review Comment:
   ```suggestion
       with detail_span("bundle.initialize"):
   ```
   
   initialize is pretty generic, this is probably better?



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1850,13 +1934,13 @@ def main():
     stats_factory = stats_utils.get_stats_factory(Stats)
     Stats.initialize(factory=stats_factory)
 
-    stack = ExitStack()
+    stack = contextlib.ExitStack()

Review Comment:
   Why?



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1758,73 +1827,88 @@ def finalize(
 
     task = ti.task
     # Pushing xcom for each operator extra links defined on the operator only.
-    for oe in task.operator_extra_links:
-        try:
-            link, xcom_key = oe.get_link(operator=task, ti_key=ti), 
oe.xcom_key  # type: ignore[arg-type]
-            log.debug("Setting xcom for operator extra link", link=link, 
xcom_key=xcom_key)
-            _xcom_push_to_db(ti, key=xcom_key, value=link)
-        except Exception:
-            log.exception(
-                "Failed to push an xcom for task operator extra link",
-                link_name=oe.name,
-                xcom_key=oe.xcom_key,
-                ti=ti,
-            )
-
-    if getattr(ti.task, "overwrite_rtif_after_execution", False):
-        log.debug("Overwriting Rendered template fields.")
-        if ti.task.template_fields:
+    with detail_span("handle_extra_links"):
+        for oe in task.operator_extra_links:
             try:
-                
SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task)))
+                link, xcom_key = oe.get_link(operator=task, ti_key=ti), 
oe.xcom_key  # type: ignore[arg-type]
+                log.debug("Setting xcom for operator extra link", link=link, 
xcom_key=xcom_key)
+                _xcom_push_to_db(ti, key=xcom_key, value=link)
             except Exception:
                 log.exception(
-                    "Failed to set rendered fields during finalization",
-                    task_id=ti.task_id,
-                    dag_id=ti.dag_id,
+                    "Failed to push an xcom for task operator extra link",
+                    link_name=oe.name,
+                    xcom_key=oe.xcom_key,
+                    ti=ti,
                 )
 
+    if getattr(ti.task, "overwrite_rtif_after_execution", False):
+        with detail_span("overwrite_rtif"):
+            log.debug("Overwriting Rendered template fields.")
+            if ti.task.template_fields:
+                try:
+                    SUPERVISOR_COMMS.send(
+                        
SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task))
+                    )
+                except Exception:
+                    log.exception(
+                        "Failed to set rendered fields during finalization",
+                        task_id=ti.task_id,
+                        dag_id=ti.dag_id,
+                    )
+
     log.debug("Running finalizers", ti=ti)
     if state == TaskInstanceState.SUCCESS:
-        _run_task_state_change_callbacks(task, "on_success_callback", context, 
log)
-        try:
-            get_listener_manager().hook.on_task_instance_success(
-                previous_state=TaskInstanceState.RUNNING, task_instance=ti
-            )
-        except Exception:
-            log.exception("error calling listener")
+        with detail_span("success_callback"):

Review Comment:
   ```suggestion
           with detail_span("on_success_callback"):
   ```
   
   maybe?



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1758,73 +1827,88 @@ def finalize(
 
     task = ti.task
     # Pushing xcom for each operator extra links defined on the operator only.
-    for oe in task.operator_extra_links:
-        try:
-            link, xcom_key = oe.get_link(operator=task, ti_key=ti), 
oe.xcom_key  # type: ignore[arg-type]
-            log.debug("Setting xcom for operator extra link", link=link, 
xcom_key=xcom_key)
-            _xcom_push_to_db(ti, key=xcom_key, value=link)
-        except Exception:
-            log.exception(
-                "Failed to push an xcom for task operator extra link",
-                link_name=oe.name,
-                xcom_key=oe.xcom_key,
-                ti=ti,
-            )
-
-    if getattr(ti.task, "overwrite_rtif_after_execution", False):
-        log.debug("Overwriting Rendered template fields.")
-        if ti.task.template_fields:
+    with detail_span("handle_extra_links"):
+        for oe in task.operator_extra_links:
             try:
-                
SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task)))
+                link, xcom_key = oe.get_link(operator=task, ti_key=ti), 
oe.xcom_key  # type: ignore[arg-type]
+                log.debug("Setting xcom for operator extra link", link=link, 
xcom_key=xcom_key)
+                _xcom_push_to_db(ti, key=xcom_key, value=link)
             except Exception:
                 log.exception(
-                    "Failed to set rendered fields during finalization",
-                    task_id=ti.task_id,
-                    dag_id=ti.dag_id,
+                    "Failed to push an xcom for task operator extra link",
+                    link_name=oe.name,
+                    xcom_key=oe.xcom_key,
+                    ti=ti,
                 )
 
+    if getattr(ti.task, "overwrite_rtif_after_execution", False):
+        with detail_span("overwrite_rtif"):
+            log.debug("Overwriting Rendered template fields.")
+            if ti.task.template_fields:
+                try:
+                    SUPERVISOR_COMMS.send(
+                        
SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task))
+                    )
+                except Exception:
+                    log.exception(
+                        "Failed to set rendered fields during finalization",
+                        task_id=ti.task_id,
+                        dag_id=ti.dag_id,
+                    )
+
     log.debug("Running finalizers", ti=ti)
     if state == TaskInstanceState.SUCCESS:
-        _run_task_state_change_callbacks(task, "on_success_callback", context, 
log)
-        try:
-            get_listener_manager().hook.on_task_instance_success(
-                previous_state=TaskInstanceState.RUNNING, task_instance=ti
-            )
-        except Exception:
-            log.exception("error calling listener")
+        with detail_span("success_callback"):
+            _run_task_state_change_callbacks(task, "on_success_callback", 
context, log)
+        with detail_span("listener.success_callback"):

Review Comment:
   ```suggestion
           with detail_span("listener.on_task_instance_success"):
   ```
   
   maybe?



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1265,7 +1322,7 @@ def _on_term(signum, frame):
                 import jinja2
 
                 # If the task failed, swallow rendering error so it doesn't 
mask the main error.
-                with suppress(jinja2.TemplateSyntaxError, 
jinja2.UndefinedError):
+                with contextlib.suppress(jinja2.TemplateSyntaxError, 
jinja2.UndefinedError):

Review Comment:
   Why this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to