ephraimbuddy commented on code in PR #59639:
URL: https://github.com/apache/airflow/pull/59639#discussion_r2635107226


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2584,8 +2584,16 @@ def adopt_or_reset_orphaned_tasks(self, session: Session 
= NEW_SESSION) -> int:
                     reset_tis_message = []
                     for ti in to_reset:
                         reset_tis_message.append(repr(ti))
+                        # If we reset a TI, it will be eligible to be 
scheduled again.
+                        # This can cause the scheduler to increase the 
try_number on the TI.
+                        # Record the current try to TaskInstanceHistory first 
so users have an audit trail for
+                        # the attempt that was abandoned.
+                        ti.prepare_db_for_next_try(session=session)
+
                         ti.state = None
                         ti.queued_by_job_id = None
+                        ti.external_executor_id = None
+                        ti.clear_next_method_args()

Review Comment:
   We need to reset the external_executor_id so that in k8s, the pod will have 
a different ID and not interfere with adoption/cleanup. Typically treating this 
as a new attempt. 
   clear_next_method_args is called too so this doesn't behave like it's 
resuming from deferred but a new attempt



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to