Taragolis commented on issue #29238:
URL: https://github.com/apache/airflow/issues/29238#issuecomment-1408954835
@TJaniF I think it should be something like this
```python
from airflow.decorators import dag, task_group, task
from airflow.providers.amazon.aws.operators.s3 import (
S3CreateObjectOperator
)
from pendulum import datetime
import json
MY_S3_BUCKET = "s3://mytxtbucket"
AWS_CONN_ID = "aws_conn"
@dag(
start_date=datetime(2022, 12, 1, tz="UTC"),
schedule=None,
catchup=False,
)
def S3_filename_bug_dag():
@task_group()
def create_s3_files(num):
@task
def return_num_as_int(my_num):
return my_num
@task
def create_copy_kwargs(my_num):
return {
"data": json.dumps(my_num),
"s3_key": f"{MY_S3_BUCKET}/{my_num}.txt",
}
my_num_as_int = return_num_as_int(num)
kwargs = create_copy_kwargs(my_num_as_int)
S3CreateObjectOperator.partial(
task_id="write_to_s3",
aws_conn_id=AWS_CONN_ID,
replace=True
).expand_kwargs(kwargs)
create_s3_files.expand(num=[0,1,2,3,4,5])
S3_filename_bug_dag()
```
Unfortunetly when I tried I got
```console
Broken DAG: [/files/dags/issue_29238.py] Traceback (most recent call last):
File "<attrs generated init
airflow.models.mappedoperator.MappedOperator>", line 27, in __init__
self.__attrs_post_init__()
File "/opt/airflow/airflow/models/mappedoperator.py", line 309, in
__attrs_post_init__
raise NotImplementedError("operator expansion in an expanded task group
is not yet supported")
NotImplementedError: operator expansion in an expanded task group is not yet
supported
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]