tmaurin commented on issue #56459:
URL: https://github.com/apache/airflow/issues/56459#issuecomment-3469713714
Small update:
I'm not 100% it is the same thing, but I found out you can reproduce the
issue by using an fstring instead of referencing another variable.
In case it can be usefull starting from above example for the TaskGroup:
```
class CustomOperator(BaseOperator):
template_fields = (
"input_file",
)
def __init__(self, input_file, **kwargs):
# This line causes the error with MappedArgument inside of taskGroup
in Airflow 3.x
self.input_file = f"toto/{input_file}"
super().__init__(**kwargs)
def execute(self, context: Context):
print(f"file: {self.input_file}")
return self.input_file
@task()
def transmit_param(val: str) -> str:
return val
@dag(
dag_id="test_mapped_argument_split_issue3",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
)
def test_mapped_argument_split():
"""Reproduce the MappedArgument split error"""
input_files = [
"s3://bucket/path/to/file1.csv",
"s3://bucket/path/to/file2.csv",
"s3://bucket/path/to/file3.csv",
]
@task_group()
def test_group(
input_file: str,
) -> CustomOperator:
return CustomOperator(input_file=f"{input_file}",
task_id="custom_operator")
test_group.expand(input_file=input_files)
```
Will print:
```
file: toto/MappedArgument(_input=DictOfListsExpandInput(value={'input_file':
['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv',
's3://bucket/path/to/file3.csv']}), _key='input_file')
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]