TJaniF commented on issue #29238:
URL: https://github.com/apache/airflow/issues/29238#issuecomment-1409103664

   Hey @josh-fell 
   
   Passing in `num` directly creates one file with `MappedArgument` in the name 
and content:
   <img width="1172" alt="Screenshot 2023-01-30 at 18 47 38" 
src="https://user-images.githubusercontent.com/90063506/215554215-42b32b16-4bd6-45ba-b9ea-f06459de7f7a.png";>
   
   ```python
   from airflow.decorators import dag, task_group, task
   from airflow.providers.amazon.aws.operators.s3 import (
       S3CreateObjectOperator
   )
   from pendulum import datetime
   
   import json
   
   MY_S3_BUCKET = "s3://mytxtbucket"
   AWS_CONN_ID = "aws_conn"
   
   @dag(
       start_date=datetime(2022, 12, 1),
       schedule=None,
       catchup=False,
   )
   def S3_filename_bug_dag_num_direct():
   
       @task_group()
       def create_s3_files(num):
   
           write_to_s3 = S3CreateObjectOperator(
               task_id="write_to_s3",
               aws_conn_id=AWS_CONN_ID,
               data=json.dumps(f"{num}"),
               replace=True,
               s3_key=f"{MY_S3_BUCKET}/{num}.txt"
           )
   
       tg_object = create_s3_files.expand(num=[0,1,2,3,4,5])
   
   
   S3_filename_bug_dag_num_direct()
   ```
   
   Not using json.dumps ends with the same issue:
   
   <img width="1172" alt="Screenshot 2023-01-30 at 18 50 32" 
src="https://user-images.githubusercontent.com/90063506/215554835-0eec041a-6e7d-41bc-b0f0-bd76c89b2ea7.png";>
   
   for:
   
   ```python
   from airflow.decorators import dag, task_group, task
   from airflow.providers.amazon.aws.operators.s3 import (
       S3CreateObjectOperator
   )
   from pendulum import datetime
   
   
   MY_S3_BUCKET = "s3://mytxtbucket"
   AWS_CONN_ID = "aws_conn"
   
   @dag(
       start_date=datetime(2022, 12, 1),
       schedule=None,
       catchup=False,
   )
   def S3_filename_bug_dag_no_json_dumps():
   
       @task_group()
       def create_s3_files(num):
   
           @task
           def return_num_as_int(my_num):
               return my_num
   
           my_num_as_int = return_num_as_int(num)
   
           write_to_s3 = S3CreateObjectOperator(
               task_id="write_to_s3",
               aws_conn_id=AWS_CONN_ID,
               data="Hi",
               replace=True,
               s3_key=f"{MY_S3_BUCKET}/a/{my_num_as_int}.txt"
           )
   
           my_num_as_int >> write_to_s3
   
       tg_object = create_s3_files.expand(num=[0,1,2,3,4,5])
   
   
   S3_filename_bug_dag_no_json_dumps()
   ```
   
   @Taragolis yeah, mapping tasks within mapped task groups seems to be a 
future thing at this point 😅 . 
   
   
   **More variations:**
   
   Using a traditional PythonOperator works as expected (each Python task in a 
mapped task group instance only uses one of the inputs):
   
   ```python
   from airflow.decorators import dag, task_group, task
   from airflow.operators.python import PythonOperator
   from pendulum import datetime
   
   def multiply_by_10(number):
       return number * 10
   
   @dag(
       start_date=datetime(2022, 12, 1),
       schedule=None,
       catchup=False,
   )
   def python_map(): 
   
       @task_group()
       def say_things(num):
   
           @task 
           def turn_num_to_int(num):
               return num 
   
           int_num = turn_num_to_int(num)
   
           t1 = PythonOperator(
               task_id="t1",
               python_callable=multiply_by_10,
               op_args=[int_num] 
           )
   
           int_num >> t1 
   
       tg_object = say_things.expand(num=[0,1,2,3,4,5])
   
   python_map()
   ```
   
   But when using `op_args=["{{ ti.xcom_pull(task_ids=['turn_num_to_int'], 
key='return_value') }}"]` instead I get:
   
   ```
   [2023-01-30, 18:11:44 UTC] {python.py:177} INFO - Done. Returned value was: 
[][][][][][][][][][]
   ```
   
   Switching out the Python for a BashOperator leads to a (on the surface) 
similar looking error as with the S3CopyObjectOperator:
   
   ```
   from airflow.decorators import dag, task_group, task
   from airflow.operators.bash import BashOperator
   from pendulum import datetime
   
   @dag(
       start_date=datetime(2022, 12, 1),
       schedule=None,
       catchup=False,
   )
   def bash_map(): 
   
       @task_group()
       def say_things(num):
   
           @task 
           def turn_num_to_int(num):
               time.sleep(5)
               return num 
   
           int_num = turn_num_to_int(num)
   
           t2 = BashOperator(
               task_id="t2",
               bash_command=f"echo {int_num}"
           )
   
           int_num >> t2
   
       tg_object = say_things.expand(num=[0,1,2,3,4,5])
   
   
   bash_map()
   ```
   
   Prints:
   ```
   [2023-01-30, 18:15:35 UTC] {subprocess.py:63} INFO - Tmp dir root location: 
    /tmp
   [2023-01-30, 18:15:35 UTC] {subprocess.py:75} INFO - Running command: 
['/bin/bash', '-c', 'echo [0, 1, 2, 3, 4, 5]']
   [2023-01-30, 18:15:35 UTC] {subprocess.py:86} INFO - Output:
   [2023-01-30, 18:15:35 UTC] {subprocess.py:93} INFO - [0, 1, 2, 3, 4, 5]
   [2023-01-30, 18:15:35 UTC] {subprocess.py:97} INFO - Command exited with 
return code 0
   ```
   
   for every map index.
   
   Same when using 
   
   ```
   bash_command=f"echo $MYNUM",
   env={"MYNUM": str(int_num)}
   ```
   
   in the operator.
   
   So the issue is not contained to the S3CopyObjectOperator but not with all 
traditional operators. 
   
   Is it possible that the mapped input is evaluated differently when in a 
Python function (no matter if decorated or traditional PythonOperator than when 
in an operator input?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to