ashb commented on PR #21877:
URL: https://github.com/apache/airflow/pull/21877#issuecomment-1133409500

   I've just noticed that this causes a problem for the follow-on mini 
scheduler for mapped tasks. I guess that code path wasn't sufficiently unit 
tested.
   
   DAG
   
   ```
   import csv
   import io
   import os
   import json
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.models.xcom_arg import XComArg
   from airflow.providers.amazon.aws.hooks.s3 import S3Hook
   from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
   
   
   with DAG(dag_id='mapped_s3', start_date=datetime(2022, 5, 19)) as dag:
       files = S3ListOperator(
           task_id="get_inputs",
           bucket="airflow-summit-2022",
           prefix="data_provider_a/{{ data_interval_end | ds }}/",
           delimiter='/',
           do_xcom_push=True,
       )
   
       @task
       def csv_to_json(aws_conn_id, input_bucket, key, output_bucket):
           hook = S3Hook(aws_conn_id=aws_conn_id)
   
           csv_data = hook.read_key(key, input_bucket)
           reader = csv.DictReader(io.StringIO(csv_data))
   
           output = io.BytesIO()
   
           for row in reader:
               output.write(json.dumps(row, indent=None).encode('utf-8'))
               output.write(b"\n")
   
           output.seek(0, os.SEEK_SET)
           hook.load_file_obj(output, key=key.replace(".csv", ".json"), 
bucket_name=output_bucket)
   
       csv_to_json.partial(
           aws_conn_id="aws_default", input_bucket=files.bucket, 
output_bucket="airflow-summit-2022-processed"
       ).expand(key=XComArg(files))
   ```
   
   Error:
   
   ```
     File "/home/ash/code/airflow/airflow/airflow/jobs/local_task_job.py", line 
253, in _run_mini_scheduler_on_child_tasks
       info = dag_run.task_instance_scheduling_decisions(session)
     File "/home/ash/code/airflow/airflow/airflow/utils/session.py", line 68, 
in wrapper
       return func(*args, **kwargs)
     File "/home/ash/code/airflow/airflow/airflow/models/dagrun.py", line 658, 
in task_instance_scheduling_decisions
       schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
     File "/home/ash/code/airflow/airflow/airflow/models/dagrun.py", line 714, 
in _get_ready_tis
       expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, 
session=session)
     File "/home/ash/code/airflow/airflow/airflow/models/mappedoperator.py", 
line 609, in expand_mapped_task
       operator.mul, self._resolve_map_lengths(run_id, session=session).values()
     File "/home/ash/code/airflow/airflow/airflow/models/mappedoperator.py", 
line 591, in _resolve_map_lengths
       expansion_kwargs = self._get_expansion_kwargs()
     File "/home/ash/code/airflow/airflow/airflow/models/mappedoperator.py", 
line 526, in _get_expansion_kwargs
       return getattr(self, self._expansion_kwargs_attr)
   AttributeError: 'MappedOperator' object has no attribute 'mapped_op_kwargs'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to