Deelane opened a new issue, #28526:
URL: https://github.com/apache/airflow/issues/28526

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Airflow version: 2.3.3
   
   We are starting to transition to task decorators from Operators in our dags 
and I have ran into a use case that I could not find any resources for, and I 
am unsure if it is a bug.
   
   I am attempting to invoke a task multiple times via dynamic mapping like 
this:
   
   ```
   @task_group
   def some_task_group():
     if condition: 
             some_mapped_task.partial(
                 partial_arg=some_arg
             ).expand(
                 expand_list_arg=some_list)
             ) 
   ```
   
   
   Here, `some_mapped_task` is the mapped task. Within `some_mapped_task` 
exists a function `execute_bash_function` that invokes a regular `BashOperator` 
and passes a task pool to run on:
   
   ```
   @task
   def some_mapped_task():
     task_pool=get_task_pool()
     execute_bash_function(task_pool=task_pool)
   ```
   
   ```
   def execute_bash_function(task_pool: str):
       cmd = f'some_bash_command`
       context = get_current_context()
       BashOperator(
           task_id=f'some_task_id',
           bash_command=cmd,
           pool=task_pool
       ).execute(context)
   ```
   
   So we have a mapped task invoked multiple times via expand, and the mapped 
task itself attempts to call a function that executes a `BashOperator`. 
However, the `BashOperator` is not using the `task_pool` passed in and is 
instead using the default task pool that the parent task `some_mapped_task` 
uses. 
   
   I am not entirely certain if this is due to my lack of understanding or a 
bug with airflow. When assigning the `BashOperator` to a variable `operator` 
and printing `operator.pool`, the correct pool is printed. However, the default 
pool is used.
   
   ### What you think should happen instead
   
   If this is indeed a bug, then I believe the BashOperator should use the task 
pool passed in, and not the task pool of the parent task.
   
   ### How to reproduce
   
   - Create a `@task_group` 
   - Create a `@task`
   - Create a function that executes a `BashOperator`
    - Execute the `BashOperator` function within the `@task` 
    - Execute the `@task` within the task group, using 
`task_name.partial().expand()`
   
   ### Operating System
   
   MacOs Ventura 13.0
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow==2.3.3
   apache-airflow-client==2.2.0
   apache-airflow-providers-google==8.1.0
   
   ### Deployment
   
   Composer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to