kaxil commented on code in PR #68213:
URL: https://github.com/apache/airflow/pull/68213#discussion_r3376049524


##########
task-sdk/src/airflow/sdk/bases/resumablejobmixin.py:
##########
@@ -101,29 +107,56 @@ def execute_resumable(self, context: Context) -> Any:
         Closing this window would require atomic "submit + persist", which is 
not possible across
         an external system boundary.
         """
-        task_store = context.get("task_store")
+        operator_tag = {"operator": type(self).__name__}
+
+        with tracer.start_as_current_span("resumable_job.resume_decision") as 
span:
+            span.set_attribute("operator", type(self).__name__)
+            span.set_attribute("resumable.external_id_key", 
self.external_id_key)
+
+            task_store = context.get("task_store")
+
+            if task_store is None:
+                span.set_attribute("resumable.decision", "no_task_store")
+            else:
+                external_id = task_store.get(self.external_id_key)
+                if external_id:
+                    incr("resumable_job.reconnect_attempt", tags=operator_tag)
+
+                    status = self.get_job_status(external_id, context)
+
+                    span.set_attribute("resumable.external_id", 
str(external_id))
+                    span.set_attribute("resumable.prior_status", status)
+
+                    if self.is_job_active(status):
+                        span.set_attribute("resumable.decision", "reconnect")
+                        incr("resumable_job.reconnect_success", 
tags=operator_tag)
+
+                        self.log.info(
+                            "Reconnecting to existing job identified by: %s 
(status: %s)",
+                            external_id,
+                            status,
+                        )
+
+                        return self.poll_until_complete(external_id, context)

Review Comment:
   The PR description says the span "wraps only the state load and decision 
block, not the full poll duration" and "deliberately excludes 
`poll_until_complete()`." But on the reconnect path `return 
self.poll_until_complete(...)` here (and on the already-succeeded path `return 
self.get_job_result(...)` at line 148) runs inside the `with 
tracer.start_as_current_span(...)` block, so the span stays open until polling 
finishes. Your own jaeger data shows it: the reconnect span is 61s (matches the 
60s `poll_until_complete` sleep) while the fresh-submit span is ~13ms. So the 
span scope is inconsistent across decisions, and the 
reconnect/already-succeeded spans measure the full poll, not just the decision. 
Capturing the decision and exiting the `with` before calling 
`poll_until_complete`/`get_job_result` would keep the span scoped to the 
decision on every path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to