ephraimbuddy commented on issue #25681:
URL: https://github.com/apache/airflow/issues/25681#issuecomment-1217786034

   > Is this fundamentally caused by the same issue underlying #25060 and 
#25200? (i.e. race condition caused by two parts of code expanding the same 
task)
   
   It seems like a race condition...expand_mapped_task being called multiple 
times
   This diff will throw more light to what's happening. It stops the crash but 
shows that the unmapped ti with index=-1 was not really updated to 0 during the 
first call to expand_mapped_task:
   ```diff
   diff --git a/airflow/models/mappedoperator.py 
b/airflow/models/mappedoperator.py
   index 2a93e442b7..b1d9693d32 100644
   --- a/airflow/models/mappedoperator.py
   +++ b/airflow/models/mappedoperator.py
   @@ -655,6 +655,15 @@ class MappedOperator(AbstractOperator):
                else:
                    # Otherwise convert this into the first mapped index, and 
create
                    # TaskInstance for other indexes.
   +                if session.query(TaskInstance).filter(
   +                    TaskInstance.dag_id == self.dag_id,
   +                    TaskInstance.run_id == run_id,
   +                    TaskInstance.task_id == self.task_id,
   +                    TaskInstance.map_index == 0
   +                ).one_or_none():
   +                    self.log.error("Checked ")
   +                    unmapped_ti.state = TaskInstanceState.REMOVED
   +                    return all_expanded_tis, total_length
                    unmapped_ti.map_index = 0
                    self.log.debug("Updated in place to become %s", unmapped_ti)
                    all_expanded_tis.append(unmapped_ti)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to