TJaniF commented on issue #29238: URL: https://github.com/apache/airflow/issues/29238#issuecomment-1409103664
Hey @josh-fell Passing in `num` directly creates one file with `MappedArgument` in the name and content: <img width="1172" alt="Screenshot 2023-01-30 at 18 47 38" src="https://user-images.githubusercontent.com/90063506/215554215-42b32b16-4bd6-45ba-b9ea-f06459de7f7a.png"> ```python from airflow.decorators import dag, task_group, task from airflow.providers.amazon.aws.operators.s3 import ( S3CreateObjectOperator ) from pendulum import datetime import json MY_S3_BUCKET = "s3://mytxtbucket" AWS_CONN_ID = "aws_conn" @dag( start_date=datetime(2022, 12, 1), schedule=None, catchup=False, ) def S3_filename_bug_dag_num_direct(): @task_group() def create_s3_files(num): write_to_s3 = S3CreateObjectOperator( task_id="write_to_s3", aws_conn_id=AWS_CONN_ID, data=json.dumps(f"{num}"), replace=True, s3_key=f"{MY_S3_BUCKET}/{num}.txt" ) tg_object = create_s3_files.expand(num=[0,1,2,3,4,5]) S3_filename_bug_dag_num_direct() ``` Not using json.dumps ends with the same issue: <img width="1172" alt="Screenshot 2023-01-30 at 18 50 32" src="https://user-images.githubusercontent.com/90063506/215554835-0eec041a-6e7d-41bc-b0f0-bd76c89b2ea7.png"> for: ```python from airflow.decorators import dag, task_group, task from airflow.providers.amazon.aws.operators.s3 import ( S3CreateObjectOperator ) from pendulum import datetime MY_S3_BUCKET = "s3://mytxtbucket" AWS_CONN_ID = "aws_conn" @dag( start_date=datetime(2022, 12, 1), schedule=None, catchup=False, ) def S3_filename_bug_dag_no_json_dumps(): @task_group() def create_s3_files(num): @task def return_num_as_int(my_num): return my_num my_num_as_int = return_num_as_int(num) write_to_s3 = S3CreateObjectOperator( task_id="write_to_s3", aws_conn_id=AWS_CONN_ID, data="Hi", replace=True, s3_key=f"{MY_S3_BUCKET}/a/{my_num_as_int}.txt" ) my_num_as_int >> write_to_s3 tg_object = create_s3_files.expand(num=[0,1,2,3,4,5]) S3_filename_bug_dag_no_json_dumps() ``` @Taragolis yeah, mapping tasks within mapped task groups seems to be a future thing at this point 😅 . **More variations:** Using a traditional PythonOperator works as expected (each Python task in a mapped task group instance only uses one of the inputs): ```python from airflow.decorators import dag, task_group, task from airflow.operators.python import PythonOperator from pendulum import datetime def multiply_by_10(number): return number * 10 @dag( start_date=datetime(2022, 12, 1), schedule=None, catchup=False, ) def python_map(): @task_group() def say_things(num): @task def turn_num_to_int(num): return num int_num = turn_num_to_int(num) t1 = PythonOperator( task_id="t1", python_callable=multiply_by_10, op_args=[int_num] ) int_num >> t1 tg_object = say_things.expand(num=[0,1,2,3,4,5]) python_map() ``` But when using `op_args=["{{ ti.xcom_pull(task_ids=['turn_num_to_int'], key='return_value') }}"]` instead I get: ``` [2023-01-30, 18:11:44 UTC] {python.py:177} INFO - Done. Returned value was: [][][][][][][][][][] ``` Switching out the Python for a BashOperator leads to a (on the surface) similar looking error as with the S3CopyObjectOperator: ``` from airflow.decorators import dag, task_group, task from airflow.operators.bash import BashOperator from pendulum import datetime @dag( start_date=datetime(2022, 12, 1), schedule=None, catchup=False, ) def bash_map(): @task_group() def say_things(num): @task def turn_num_to_int(num): time.sleep(5) return num int_num = turn_num_to_int(num) t2 = BashOperator( task_id="t2", bash_command=f"echo {int_num}" ) int_num >> t2 tg_object = say_things.expand(num=[0,1,2,3,4,5]) bash_map() ``` Prints: ``` [2023-01-30, 18:15:35 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp [2023-01-30, 18:15:35 UTC] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'echo [0, 1, 2, 3, 4, 5]'] [2023-01-30, 18:15:35 UTC] {subprocess.py:86} INFO - Output: [2023-01-30, 18:15:35 UTC] {subprocess.py:93} INFO - [0, 1, 2, 3, 4, 5] [2023-01-30, 18:15:35 UTC] {subprocess.py:97} INFO - Command exited with return code 0 ``` for every map index. Same when using ``` bash_command=f"echo $MYNUM", env={"MYNUM": str(int_num)} ``` in the operator. So the issue is not contained to the S3CopyObjectOperator but not with all traditional operators. Is it possible that the mapped input is evaluated differently when in a Python function (no matter if decorated or traditional PythonOperator than when in an operator input? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
