tmaurin commented on issue #56459:
URL: https://github.com/apache/airflow/issues/56459#issuecomment-3467702312
Hello @ashb,
I don't know where you are with it, but in case it can help you, I think
this issue might actually be linked to taskGroup + template variable
referencing another template variable rather than taskgroup + kubernetes_cmd
directly.
I think this dag is showcasing the issue:
```python
from airflow.sdk import BaseOperator, dag, task_group, task
from datetime import datetime
class CustomOperator(BaseOperator):
template_fields = (
"raw_input_file",
"input_file",
)
def __init__(self, input_file, **kwargs):
self.raw_input_file = input_file
# This line causes the error with MappedArgument inside of taskGroup
in Airflow 3.x
self.input_file = "{{ task.raw_input_file }}"
super().__init__(**kwargs)
def execute(self, context):
# Always print the raw file name
print(f"Raw file: {self.raw_input_file}")
# Print the raw file name if task is mapped directly
# But print
MappedArgument(_input=DictOfListsExpandInput(value={'input_file':
['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv',
's3://bucket/path/to/file3.csv']}), _key='input_file')
# if task is mapped inside of taskGroup
print(f"Derived file: {self.input_file}")
return self.raw_input_file
@dag(
dag_id="test_mapped_argument_split_issue",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
)
def test_mapped_argument_split():
"""Reproduce the MappedArgument split error"""
input_files = [
"s3://bucket/path/to/file1.csv",
"s3://bucket/path/to/file2.csv",
"s3://bucket/path/to/file3.csv",
]
@task_group()
def test_group(
input_file: str,
) -> CustomOperator:
return CustomOperator(input_file=input_file,
task_id="custom_operator")
test_group.expand(input_file=input_files)
CustomOperator.partial(task_id="custom_operator").expand(input_file=input_files)
test_dag = test_mapped_argument_split()
```
This is the resulting graph:
<img width="397" height="459" alt="Image"
src="https://github.com/user-attachments/assets/484d7af1-9ab5-4abe-b21e-96066979bcbf"
/>
The mapped task though the test_group have a log along this line:
```
Raw file: s3://bucket/path/to/file3.csv
Derived file:
MappedArgument(_input=DictOfListsExpandInput(value={'input_file':
['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv',
's3://bucket/path/to/file3.csv']}), _key='input_file')
```
While the task mapped directly have a log along this line:
```
Raw file: s3://bucket/path/to/file3.csv
Derived file: s3://bucket/path/to/file3.csv
```
So mapping directly the task work as expected/like in 2.X while mapping
through a taskgroup does not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]