ephraimbuddy commented on issue #25681:
URL: https://github.com/apache/airflow/issues/25681#issuecomment-1217786034
> Is this fundamentally caused by the same issue underlying #25060 and
#25200? (i.e. race condition caused by two parts of code expanding the same
task)
It seems like a race condition...expand_mapped_task being called multiple
times
This diff will throw more light to what's happening. It stops the crash but
shows that the unmapped ti with index=-1 was not really updated to 0 during the
first call to expand_mapped_task:
```diff
diff --git a/airflow/models/mappedoperator.py
b/airflow/models/mappedoperator.py
index 2a93e442b7..b1d9693d32 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -655,6 +655,15 @@ class MappedOperator(AbstractOperator):
else:
# Otherwise convert this into the first mapped index, and
create
# TaskInstance for other indexes.
+ if session.query(TaskInstance).filter(
+ TaskInstance.dag_id == self.dag_id,
+ TaskInstance.run_id == run_id,
+ TaskInstance.task_id == self.task_id,
+ TaskInstance.map_index == 0
+ ).one_or_none():
+ self.log.error("Checked ")
+ unmapped_ti.state = TaskInstanceState.REMOVED
+ return all_expanded_tis, total_length
unmapped_ti.map_index = 0
self.log.debug("Updated in place to become %s", unmapped_ti)
all_expanded_tis.append(unmapped_ti)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]