TJaniF opened a new issue, #29238:
URL: https://github.com/apache/airflow/issues/29238
### Apache Airflow Provider(s)
amazon
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon==7.2.0
### Apache Airflow version
2.5.1
### Operating System
macOS 13.1 (22C65)
### Deployment
Other Docker-based deployment
### Deployment details
Astro CLI (astro-runtime:7.2.0)
### What happened
When dynamically mapping over a task group containing the
S3CopyObjectOperator and using the mapped value within the `s3_key` or `data`
parameter of the operator; the operator will use the full list of dynamically
mapped inputs in every mapped task group instance.
Using the following mapped task group:
```python
@task_group()
def create_s3_files(num):
@task
def return_num_as_int(my_num):
return my_num
my_num_as_int = return_num_as_int(num)
write_to_s3 = S3CreateObjectOperator(
task_id="write_to_s3",
aws_conn_id=AWS_CONN_ID,
data=json.dumps(f"{my_num_as_int}"),
replace=True,
s3_key=f"{MY_S3_BUCKET}/{my_num_as_int}.txt"
)
my_num_as_int >> write_to_s3
tg_object = create_s3_files.expand(num=[0,1,2,3,4,5])
```
Creates 6 mapped task group instances with 2 tasks each as expected. The
`return_num_as_int` task returns the individual inputs for `num` as expected.
XCom for map index 3 in the screenshot:
<img width="1142" alt="Screenshot 2023-01-30 at 15 38 39"
src="https://user-images.githubusercontent.com/90063506/215507228-6055b2ba-8b31-4f0c-90bb-56f827703424.png">
The `write_to_S3` task will render the full list that is being mapped over
for everyone of its six instances within the mapped task group as shown in the
screenshot of the Rendered Template:
<img width="758" alt="Screenshot 2023-01-30 at 15 43 06"
src="https://user-images.githubusercontent.com/90063506/215508388-b1e4f275-1890-4259-9bc3-82d815be7ffd.png">
The file created in the S3 bucket matches the rendered template with the
name `[0, 1, 2, 3, 4, 5].txt` and the content `"[0, 1, 2, 3, 4, 5]"`.
### Variations
**Changing the replace parameter**
When setting `replace=True` in the operator, one file is created, all task
instances within the mapped task group are marked as successful. When setting
`replace=False` 1 mapped task instance is successful creating the same file
with the same content with the other mapped task instances failing with the
following error:
```
[2023-01-30, 14:46:35 UTC] {taskinstance.py:1768} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/usr/local/lib/python3.9/site-packages/airflow/providers/amazon/aws/operators/s3.py",
line 399, in execute
s3_hook.load_string(
File
"/usr/local/lib/python3.9/site-packages/airflow/providers/amazon/aws/hooks/s3.py",
line 99, in wrapper
return func(*bound_args.args, **bound_args.kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/providers/amazon/aws/hooks/s3.py",
line 70, in wrapper
return func(*bound_args.args, **bound_args.kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/providers/amazon/aws/hooks/s3.py",
line 695, in load_string
self._upload_file_obj(file_obj, key, bucket_name, replace, encrypt,
acl_policy)
File
"/usr/local/lib/python3.9/site-packages/airflow/providers/amazon/aws/hooks/s3.py",
line 765, in _upload_file_obj
raise ValueError(f"The key {key} already exists.")
ValueError: The key [0, 1, 2, 3, 4, 5].txt already exists.
```
**Using the mapped input directly**
When using the mapped input directly in the S3CopyObjectOperator parameter
as such:
```python
@task_group()
def create_s3_files(num):
write_to_s3 = S3CreateObjectOperator(
task_id="write_to_s3",
aws_conn_id=AWS_CONN_ID,
data=json.dumps(f"{num}"),
replace=True,
s3_key=f"{MY_S3_BUCKET}/{num}.txt"
)
write_to_s3
tg_object = create_s3_files.expand(num=[0,1,2,3,4,5])
```
The resulting Rendered Template uses the full MappedArgument (this matches
the one file created in S3):
<img width="758" alt="Screenshot 2023-01-30 at 15 51 05"
src="https://user-images.githubusercontent.com/90063506/215510413-d700998e-653b-44ba-8d49-8ccc95907d47.png">
**Mapping over the operator directly works as expected**
When mapping over the S3CopyObjectOperator directly, both within or outside
of a task group. The results are as expected one file created for each mapped
task instance.
```python
@task_group()
def create_s3_files(list_of_keys):
write_to_s3 = S3CreateObjectOperator.partial(
task_id="write_to_s3",
aws_conn_id=AWS_CONN_ID,
data=json.dumps("hi"),
replace=True,
).expand(s3_key=list_of_keys)
create_s3_files(list_of_keys=[f"{MY_S3_BUCKET}/0.txt",f"{MY_S3_BUCKET}/1.txt"])
```
The code above creates 2 files. One named `0.txt` the other named `1.txt`.
### What you think should happen instead
The S3CopyObjectOperator should create one file in S3 for each element in
the input that the task group is being mapped over only using the element with
the correct map index.
Instead of creating `[0, 1, 2, 3, 4, 5].txt` six times I would expect 6
files to be created named `0.txt`, `1.txt`, `2.txt` etc...
This behavior would match the behavior of the task flow task within the
dyncamically matched task group which has the outputs `0`, `1`, `2` etc for
each of its task instances within the mapped task group instances. It would
also match the behavior seen when mapping over the S3CopyObjectOperator
directly.
### How to reproduce
Full DAG to reproduce. Needs 1 S3 bucket and an AWS conn id.
Reproduction steps:
1) Add a valid `AWS_CONN_ID` and `MY_S3_BUCKET` after the package imports.
2) Run the DAG.
3) View the file created in the S3 bucket.
```python
from airflow.decorators import dag, task_group, task
from airflow.providers.amazon.aws.operators.s3 import (
S3CreateObjectOperator
)
from pendulum import datetime
import json
MY_S3_BUCKET = "s3://mytxtbucket"
AWS_CONN_ID = "aws_conn"
@dag(
start_date=datetime(2022, 12, 1),
schedule=None,
catchup=False,
)
def S3_filename_bug_dag():
@task_group()
def create_s3_files(num):
@task
def return_num_as_int(my_num):
return my_num
my_num_as_int = return_num_as_int(num)
write_to_s3 = S3CreateObjectOperator(
task_id="write_to_s3",
aws_conn_id=AWS_CONN_ID,
data=json.dumps(f"{my_num_as_int}"),
replace=True,
s3_key=f"{MY_S3_BUCKET}/{my_num_as_int}.txt"
)
my_num_as_int >> write_to_s3
tg_object = create_s3_files.expand(num=[0,1,2,3,4,5])
S3_filename_bug_dag()
```
### Anything else
cc: @kentdanas
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]