TJaniF opened a new issue, #29238:
URL: https://github.com/apache/airflow/issues/29238

   ### Apache Airflow Provider(s)
   
   amazon
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==7.2.0
   
   ### Apache Airflow version
   
   2.5.1
   
   ### Operating System
   
   macOS 13.1 (22C65)
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   Astro CLI (astro-runtime:7.2.0) 
   
   ### What happened
   
   When dynamically mapping over a task group containing the 
S3CopyObjectOperator and using the mapped value within the `s3_key` or `data` 
parameter of the operator; the operator will use the full list of dynamically 
mapped inputs in every mapped task group instance. 
   
   Using the following mapped task group:
   
   ```python
       @task_group()
       def create_s3_files(num):
   
           @task
           def return_num_as_int(my_num):
               return my_num
   
           my_num_as_int = return_num_as_int(num)
   
           write_to_s3 = S3CreateObjectOperator(
               task_id="write_to_s3",
               aws_conn_id=AWS_CONN_ID,
               data=json.dumps(f"{my_num_as_int}"),
               replace=True,
               s3_key=f"{MY_S3_BUCKET}/{my_num_as_int}.txt"
           )
   
           my_num_as_int >> write_to_s3
   
       tg_object = create_s3_files.expand(num=[0,1,2,3,4,5])
   ```
   
   Creates 6 mapped task group instances with 2 tasks each as expected. The 
`return_num_as_int` task returns the individual inputs for `num` as expected. 
XCom for map index 3 in the screenshot:
   
   <img width="1142" alt="Screenshot 2023-01-30 at 15 38 39" 
src="https://user-images.githubusercontent.com/90063506/215507228-6055b2ba-8b31-4f0c-90bb-56f827703424.png";>
   
   The `write_to_S3` task will render the full list that is being mapped over 
for everyone of its six instances within the mapped task group as shown in the 
screenshot of the Rendered Template:
   
   <img width="758" alt="Screenshot 2023-01-30 at 15 43 06" 
src="https://user-images.githubusercontent.com/90063506/215508388-b1e4f275-1890-4259-9bc3-82d815be7ffd.png";>
   
   The file created in the S3 bucket matches the rendered template with the 
name `[0, 1, 2, 3, 4, 5].txt` and the content `"[0, 1, 2, 3, 4, 5]"`. 
   
   
   ### Variations
   
   **Changing the replace parameter**
   
   When setting `replace=True` in the operator, one file is created, all task 
instances within the mapped task group are marked as successful. When setting 
`replace=False` 1 mapped task instance is successful creating the same file 
with the same content with the other mapped task instances failing with the 
following error:
   
   ```
   [2023-01-30, 14:46:35 UTC] {taskinstance.py:1768} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/amazon/aws/operators/s3.py",
 line 399, in execute
       s3_hook.load_string(
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/amazon/aws/hooks/s3.py",
 line 99, in wrapper
       return func(*bound_args.args, **bound_args.kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/amazon/aws/hooks/s3.py",
 line 70, in wrapper
       return func(*bound_args.args, **bound_args.kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/amazon/aws/hooks/s3.py",
 line 695, in load_string
       self._upload_file_obj(file_obj, key, bucket_name, replace, encrypt, 
acl_policy)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/providers/amazon/aws/hooks/s3.py",
 line 765, in _upload_file_obj
       raise ValueError(f"The key {key} already exists.")
   ValueError: The key [0, 1, 2, 3, 4, 5].txt already exists.
   ```
   
   **Using the mapped input directly**
   
   When using the mapped input directly in the S3CopyObjectOperator parameter 
as such:
   
   ```python
       @task_group()
       def create_s3_files(num):
   
           write_to_s3 = S3CreateObjectOperator(
               task_id="write_to_s3",
               aws_conn_id=AWS_CONN_ID,
               data=json.dumps(f"{num}"),
               replace=True,
               s3_key=f"{MY_S3_BUCKET}/{num}.txt"
           )
   
           write_to_s3
   
       tg_object = create_s3_files.expand(num=[0,1,2,3,4,5])
   ```
   
   The resulting Rendered Template uses the full MappedArgument (this matches 
the one file created in S3):
   
   <img width="758" alt="Screenshot 2023-01-30 at 15 51 05" 
src="https://user-images.githubusercontent.com/90063506/215510413-d700998e-653b-44ba-8d49-8ccc95907d47.png";>
   
   
   **Mapping over the operator directly works as expected**
   
   When mapping over the S3CopyObjectOperator directly, both within or outside 
of a task group. The results are as expected one file created for each mapped 
task instance. 
   
   ```python
       @task_group()
       def create_s3_files(list_of_keys):
   
           write_to_s3 = S3CreateObjectOperator.partial(
               task_id="write_to_s3",
               aws_conn_id=AWS_CONN_ID,
               data=json.dumps("hi"),
               replace=True,
           ).expand(s3_key=list_of_keys)
   
       
create_s3_files(list_of_keys=[f"{MY_S3_BUCKET}/0.txt",f"{MY_S3_BUCKET}/1.txt"])
   ```
   
   The code above creates 2 files. One named `0.txt` the other named `1.txt`.
   
   ### What you think should happen instead
   
   The S3CopyObjectOperator should create one file in S3 for each element in 
the input that the task group is being mapped over only using the element with 
the correct map index. 
   
   Instead of creating `[0, 1, 2, 3, 4, 5].txt` six times I would expect 6 
files to be created named `0.txt`, `1.txt`, `2.txt` etc... 
   
   This behavior would match the behavior of the task flow task within the 
dyncamically matched task group which has the outputs `0`, `1`, `2` etc for 
each of its task instances within the mapped task group instances. It would 
also match the behavior seen when mapping over the S3CopyObjectOperator 
directly.
   
   ### How to reproduce
   
   Full DAG to reproduce. Needs 1 S3 bucket and an AWS conn id. 
   
   Reproduction steps:
   
   1) Add a valid `AWS_CONN_ID` and `MY_S3_BUCKET` after the package imports.
   2) Run the DAG.
   3) View the file created in the S3 bucket.
   
   ```python
   from airflow.decorators import dag, task_group, task
   from airflow.providers.amazon.aws.operators.s3 import (
       S3CreateObjectOperator
   )
   from pendulum import datetime
   
   import json
   
   MY_S3_BUCKET = "s3://mytxtbucket"
   AWS_CONN_ID = "aws_conn"
   
   @dag(
       start_date=datetime(2022, 12, 1),
       schedule=None,
       catchup=False,
   )
   def S3_filename_bug_dag():
   
       @task_group()
       def create_s3_files(num):
   
           @task
           def return_num_as_int(my_num):
               return my_num
   
           my_num_as_int = return_num_as_int(num)
   
           write_to_s3 = S3CreateObjectOperator(
               task_id="write_to_s3",
               aws_conn_id=AWS_CONN_ID,
               data=json.dumps(f"{my_num_as_int}"),
               replace=True,
               s3_key=f"{MY_S3_BUCKET}/{my_num_as_int}.txt"
           )
   
           my_num_as_int >> write_to_s3
   
       tg_object = create_s3_files.expand(num=[0,1,2,3,4,5])
   
   
   S3_filename_bug_dag()
   ```
   
   ### Anything else
   
   cc: @kentdanas 
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to