Jorricks commented on issue #26068:
URL: https://github.com/apache/airflow/issues/26068#issuecomment-1231806851
From what I can see, this is only present for TaskInstance created by older
Airflow versions as long as you don't use the `ti_mutation_hook`.
```
def _get_task_creator(
self, created_counts: Dict[str, int], ti_mutation_hook: Callable,
hook_is_noop: bool
) -> Callable:
"""
Get the task creator function.
This function also updates the created_counts dictionary with the
number of tasks created.
:param created_counts: Dictionary of task_type -> count of created
TIs
:param ti_mutation_hook: task_instance_mutation_hook function
:param hook_is_noop: Whether the task_instance_mutation_hook is a
noop
"""
if hook_is_noop:
def create_ti_mapping(task: "Operator", indexes: Tuple[int,
...]) -> Generator:
created_counts[task.task_type] += 1
for map_index in indexes:
yield TI.insert_mapping(self.run_id, task,
map_index=map_index)
creator = create_ti_mapping
else:
def create_ti(task: "Operator", indexes: Tuple[int, ...]) ->
Generator:
for map_index in indexes:
ti = TI(task, run_id=self.run_id, map_index=map_index)
ti_mutation_hook(ti)
created_counts[ti.operator] += 1
yield ti
creator = create_ti
return creator
```
The `TI.insert_mapping` will set the `queue` for you. The other one however,
will not set the `queue` for you, so I can imagine if you just spawned a couple
TaskInstances, you will get the same error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]