tmaurin commented on issue #56459:
URL: https://github.com/apache/airflow/issues/56459#issuecomment-3467702312

   Hello @ashb,
   I don't know where you are with it, but in case it can help you, I think 
this issue might actually be linked to taskGroup + template variable 
referencing another template variable rather than taskgroup + kubernetes_cmd 
directly.
   
   I think this dag is showcasing the issue:
   ```python
   from airflow.sdk import BaseOperator, dag, task_group, task
   
   from datetime import datetime
   
   
   class CustomOperator(BaseOperator):
       template_fields = (
           "raw_input_file",
           "input_file",
       )
   
       def __init__(self, input_file, **kwargs):
           self.raw_input_file = input_file
   
           # This line causes the error with MappedArgument inside of taskGroup 
in Airflow 3.x
           self.input_file = "{{ task.raw_input_file }}"
           super().__init__(**kwargs)
   
       def execute(self, context):
           # Always print the raw file name
           print(f"Raw file: {self.raw_input_file}")
           # Print the raw file name if task is mapped directly
           # But print 
MappedArgument(_input=DictOfListsExpandInput(value={'input_file': 
['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv', 
's3://bucket/path/to/file3.csv']}), _key='input_file')
           # if task is mapped inside of taskGroup
           print(f"Derived file: {self.input_file}")
           return self.raw_input_file
   
   
   @dag(
       dag_id="test_mapped_argument_split_issue",
       start_date=datetime(2025, 1, 1),
       schedule=None,
       catchup=False,
   )
   def test_mapped_argument_split():
       """Reproduce the MappedArgument split error"""
       input_files = [
           "s3://bucket/path/to/file1.csv",
           "s3://bucket/path/to/file2.csv",
           "s3://bucket/path/to/file3.csv",
       ]
   
       @task_group()
       def test_group(
               input_file: str,
       ) -> CustomOperator:
           return CustomOperator(input_file=input_file, 
task_id="custom_operator")
       test_group.expand(input_file=input_files)
       
CustomOperator.partial(task_id="custom_operator").expand(input_file=input_files)
   
   
   test_dag = test_mapped_argument_split()
   ```
   This is the resulting graph:
   
   <img width="397" height="459" alt="Image" 
src="https://github.com/user-attachments/assets/484d7af1-9ab5-4abe-b21e-96066979bcbf";
 />
   
   The mapped task though the test_group have a log along this line:
   ```
   Raw file: s3://bucket/path/to/file3.csv
   Derived file: 
MappedArgument(_input=DictOfListsExpandInput(value={'input_file': 
['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv', 
's3://bucket/path/to/file3.csv']}), _key='input_file')
   ```
   
   While the task mapped directly have a log along this line:
   ```
   Raw file: s3://bucket/path/to/file3.csv
   Derived file: s3://bucket/path/to/file3.csv
   ```
   So mapping directly the task work as expected/like in 2.X while mapping 
through a taskgroup does not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to