dstandish commented on code in PR #24489:
URL: https://github.com/apache/airflow/pull/24489#discussion_r899768327


##########
docs/apache-airflow/concepts/dynamic-task-mapping.rst:
##########
@@ -208,27 +210,156 @@ In this example you have a regular data delivery to an 
S3 bucket and want to app
 
 
     with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag:
-        files = S3ListOperator(
+        list_filenames = S3ListOperator(
             task_id="get_input",
             bucket="example-bucket",
             prefix='incoming/provider_a/{{ 
data_interval_start.strftime("%Y-%m-%d") }}',
         )
 
         @task
-        def count_lines(aws_conn_id, bucket, file):
+        def count_lines(aws_conn_id, bucket, filename):
             hook = S3Hook(aws_conn_id=aws_conn_id)
 
-            return len(hook.read_key(file, bucket).splitlines())
+            return len(hook.read_key(filename, bucket).splitlines())
 
         @task
         def total(lines):
             return sum(lines)
 
-        counts = count_lines.partial(aws_conn_id="aws_default", 
bucket=files.bucket).expand(
-            file=XComArg(files)
-        )
+        counts = count_lines.partial(
+            aws_conn_id="aws_default", bucket=list_filenames.bucket
+        ).expand(filename=XComArg(list_filenames))
+
         total(lines=counts)
 
+Assigning multiple parameters to a non-TaskFlow operator
+========================================================
+
+Sometimes an upstream needs to specify multiple arguments to a downstream 
operator. To do this, you can use the ``expand_kwargs`` function, which takes a 
sequence of mappings to map against.
+
+.. code-block:: python
+
+    BashOperator.partial(task_id="bash").expand_kwargs(
+        [
+            {"bash_command": "echo $ENV1", "env": {"ENV1": "1"}},
+            {"bash_command": "printf $ENV2", "env": {"ENV2": "2"}},
+        ],
+    )
+
+This produces two task instances at run-time printing ``1`` and ``2`` 
respectively.
+
+Similar to ``expand``, you can also map against a XCom that returns a list of 
dicts, or a list of XComs each returning a dict. Re-using the S3 example above, 
you can use a mapped task to perform “branching” and copy files to different 
buckets:
+
+.. code-block:: python
+
+    list_filenames = S3ListOperator(...)  # Same as the above example.
+
+
+    @task
+    def create_copy_kwargs(filename):
+        if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
+            dest_bucket_name = "my_text_bucket"
+        else:
+            dest_bucket_name = "my_other_bucket"
+        return {
+            "source_bucket_key": filename,
+            "dest_bucket_key": filename,
+            "dest_bucket_name": dest_bucket_name,
+        }
+
+
+    copy_kwargs = create_copy_kwargs.expand(filename=XComArg(list_filenames))
+
+    # Copy files to another bucket, based on the file's extension.
+    copy_filenames = S3CopyObjectOperator.partial(
+        task_id="copy_files", source_bucket_name=list_filenames.bucket
+    ).expand_kwargs(copy_kwargs)
+
+Filtering items from an expanded task
+=====================================
+
+A mapped task can remove any elements from being passed on to its downstream 
tasks by returning ``None``. For example, if we want to *only* copy files from 
an S3 bucket to another with certain extensions, we could implement 
``create_copy_kwargs`` like this instead:
+
+.. code-block:: python
+
+    @task
+    def create_copy_kwargs(filename):
+        # Skip files not ending with these suffixes.
+        if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
+            return None
+        return {
+            "source_bucket_key": filename,
+            "dest_bucket_key": filename,
+            "dest_bucket_name": "my_other_bucket",
+        }
+
+
+    # copy_kwargs and copy_files are implemented the same.
+
+This makes ``copy_files`` only expand against ``.txt``, ``.json``, and 
``.yml`` files, while ignoring the rest.

Review Comment:
   ```suggestion
   This makes ``copy_files`` only expand against ``.json`` and ``.yml`` files, 
while ignoring the rest.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to