ashb commented on PR #21877:
URL: https://github.com/apache/airflow/pull/21877#issuecomment-1133409500
I've just noticed that this causes a problem for the follow-on mini
scheduler for mapped tasks. I guess that code path wasn't sufficiently unit
tested.
DAG
```
import csv
import io
import os
import json
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.models.xcom_arg import XComArg
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
with DAG(dag_id='mapped_s3', start_date=datetime(2022, 5, 19)) as dag:
files = S3ListOperator(
task_id="get_inputs",
bucket="airflow-summit-2022",
prefix="data_provider_a/{{ data_interval_end | ds }}/",
delimiter='/',
do_xcom_push=True,
)
@task
def csv_to_json(aws_conn_id, input_bucket, key, output_bucket):
hook = S3Hook(aws_conn_id=aws_conn_id)
csv_data = hook.read_key(key, input_bucket)
reader = csv.DictReader(io.StringIO(csv_data))
output = io.BytesIO()
for row in reader:
output.write(json.dumps(row, indent=None).encode('utf-8'))
output.write(b"\n")
output.seek(0, os.SEEK_SET)
hook.load_file_obj(output, key=key.replace(".csv", ".json"),
bucket_name=output_bucket)
csv_to_json.partial(
aws_conn_id="aws_default", input_bucket=files.bucket,
output_bucket="airflow-summit-2022-processed"
).expand(key=XComArg(files))
```
Error:
```
File "/home/ash/code/airflow/airflow/airflow/jobs/local_task_job.py", line
253, in _run_mini_scheduler_on_child_tasks
info = dag_run.task_instance_scheduling_decisions(session)
File "/home/ash/code/airflow/airflow/airflow/utils/session.py", line 68,
in wrapper
return func(*args, **kwargs)
File "/home/ash/code/airflow/airflow/airflow/models/dagrun.py", line 658,
in task_instance_scheduling_decisions
schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
File "/home/ash/code/airflow/airflow/airflow/models/dagrun.py", line 714,
in _get_ready_tis
expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id,
session=session)
File "/home/ash/code/airflow/airflow/airflow/models/mappedoperator.py",
line 609, in expand_mapped_task
operator.mul, self._resolve_map_lengths(run_id, session=session).values()
File "/home/ash/code/airflow/airflow/airflow/models/mappedoperator.py",
line 591, in _resolve_map_lengths
expansion_kwargs = self._get_expansion_kwargs()
File "/home/ash/code/airflow/airflow/airflow/models/mappedoperator.py",
line 526, in _get_expansion_kwargs
return getattr(self, self._expansion_kwargs_attr)
AttributeError: 'MappedOperator' object has no attribute 'mapped_op_kwargs'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]