ashb opened a new pull request, #22862:
URL: https://github.com/apache/airflow/pull/22862

   In writing the docs for Dynamic Task Mapping (AIP-42) I noticed that
   there are some cases where users need to use XComArg directly, and it
   didn't feel right to make the import things from `airflow.models`.
   
   And I've now refactored the lazy import to be "data-driven" as three
   blocks of almost identical code was my limit.
   
   Example dag from the docs (future PR) that inspired this change:
   
   ```python
       from datetime import datetime
   
       from airflow import DAG
       from airflow.decorators import task
       from airflow.models.xcom_arg import XComArg  # <--- this I wasn't happy 
about
       from airflow.providers.amazon.aws.hooks.s3 import S3Hook
       from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
   
   
       with DAG(dag_id="mapped_s3", start_date=datetime(2020,4,7)) as dag:
           files = S3ListOperator(
               task_id="get_input",
               bucket="example-bucket",
               prefix='incoming/provider_a/{{ 
data_interval_start.strftime("%Y-%m-%d") }}',
           )
   
           @task
           def count_lines(aws_conn_id, bucket, file):
               hook = S3Hook(aws_conn_id=aws_conn_id)
   
               return len(hook.read_key(file, bucket).splitlines())
   
       @task
       def total(lines):
           return sum(lines) 
   
           counts = count_lines.partial(aws_conn_id="aws_default", 
bucket=files.bucket).expand(file=XComArg(files))
       total(lines=counts)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to